blob: 91f7a01de8ad6cab3a6285b6058602c5ef4f7e7f [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Synchronization primitives."""
2
Yury Selivanov6370f342017-12-10 18:36:12 -05003__all__ = ('Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004
5import collections
Andrew Svetlov28d8d142017-12-09 20:00:05 +02006import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007
8from . import events
9from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020010from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
12
Guido van Rossumab3c8892014-01-25 16:51:57 -080013class _ContextManager:
14 """Context manager.
15
16 This enables the following idiom for acquiring and releasing a
17 lock around a block:
18
19 with (yield from lock):
20 <block>
21
22 while failing loudly when accidentally using:
23
24 with lock:
25 <block>
Andrew Svetlov88743422017-12-11 17:35:49 +020026
27 Deprecated, use 'async with' statement:
28 async with lock:
29 <block>
Guido van Rossumab3c8892014-01-25 16:51:57 -080030 """
31
32 def __init__(self, lock):
33 self._lock = lock
34
35 def __enter__(self):
36 # We have no use for the "as ..." clause in the with
37 # statement for locks.
38 return None
39
40 def __exit__(self, *args):
41 try:
42 self._lock.release()
43 finally:
44 self._lock = None # Crudely prevent reuse.
45
46
Yury Selivanovd08c3632015-05-13 15:15:56 -040047class _ContextManagerMixin:
48 def __enter__(self):
49 raise RuntimeError(
50 '"yield from" should be used as context manager expression')
51
52 def __exit__(self, *args):
53 # This must exist because __enter__ exists, even though that
54 # always raises; that's how the with-statement works.
55 pass
56
57 @coroutine
58 def __iter__(self):
59 # This is not a coroutine. It is meant to enable the idiom:
60 #
61 # with (yield from lock):
62 # <block>
63 #
64 # as an alternative to:
65 #
66 # yield from lock.acquire()
67 # try:
68 # <block>
69 # finally:
70 # lock.release()
Andrew Svetlov88743422017-12-11 17:35:49 +020071 # Deprecated, use 'async with' statement:
72 # async with lock:
73 # <block>
Andrew Svetlov28d8d142017-12-09 20:00:05 +020074 warnings.warn("'with (yield from lock)' is deprecated "
75 "use 'async with lock' instead",
76 DeprecationWarning, stacklevel=2)
Yury Selivanovd08c3632015-05-13 15:15:56 -040077 yield from self.acquire()
78 return _ContextManager(self)
79
Andrew Svetlov5f841b52017-12-09 00:23:48 +020080 async def __acquire_ctx(self):
81 await self.acquire()
Victor Stinner3f438a92017-11-28 14:43:52 +010082 return _ContextManager(self)
Yury Selivanovd08c3632015-05-13 15:15:56 -040083
Andrew Svetlov5f841b52017-12-09 00:23:48 +020084 def __await__(self):
Andrew Svetlov28d8d142017-12-09 20:00:05 +020085 warnings.warn("'with await lock' is deprecated "
86 "use 'async with lock' instead",
87 DeprecationWarning, stacklevel=2)
Andrew Svetlov5f841b52017-12-09 00:23:48 +020088 # To make "with await lock" work.
89 return self.__acquire_ctx().__await__()
90
91 async def __aenter__(self):
92 await self.acquire()
Victor Stinner3f438a92017-11-28 14:43:52 +010093 # We have no use for the "as ..." clause in the with
94 # statement for locks.
95 return None
Yury Selivanovd08c3632015-05-13 15:15:56 -040096
Andrew Svetlov5f841b52017-12-09 00:23:48 +020097 async def __aexit__(self, exc_type, exc, tb):
Victor Stinner3f438a92017-11-28 14:43:52 +010098 self.release()
Yury Selivanovd08c3632015-05-13 15:15:56 -040099
100
101class Lock(_ContextManagerMixin):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700102 """Primitive lock objects.
103
104 A primitive lock is a synchronization primitive that is not owned
105 by a particular coroutine when locked. A primitive lock is in one
106 of two states, 'locked' or 'unlocked'.
107
108 It is created in the unlocked state. It has two basic methods,
109 acquire() and release(). When the state is unlocked, acquire()
110 changes the state to locked and returns immediately. When the
111 state is locked, acquire() blocks until a call to release() in
112 another coroutine changes it to unlocked, then the acquire() call
113 resets it to locked and returns. The release() method should only
114 be called in the locked state; it changes the state to unlocked
115 and returns immediately. If an attempt is made to release an
116 unlocked lock, a RuntimeError will be raised.
117
118 When more than one coroutine is blocked in acquire() waiting for
119 the state to turn to unlocked, only one coroutine proceeds when a
120 release() call resets the state to unlocked; first coroutine which
121 is blocked in acquire() is being processed.
122
Andrew Svetlov88743422017-12-11 17:35:49 +0200123 acquire() is a coroutine and should be called with 'await'.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700124
Andrew Svetlov88743422017-12-11 17:35:49 +0200125 Locks also support the asynchronous context management protocol.
126 'async with lock' statement should be used.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700127
128 Usage:
129
130 lock = Lock()
131 ...
Andrew Svetlov88743422017-12-11 17:35:49 +0200132 await lock.acquire()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700133 try:
134 ...
135 finally:
136 lock.release()
137
138 Context manager usage:
139
140 lock = Lock()
141 ...
Andrew Svetlov88743422017-12-11 17:35:49 +0200142 async with lock:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700143 ...
144
145 Lock objects can be tested for locking state:
146
147 if not lock.locked():
Andrew Svetlov88743422017-12-11 17:35:49 +0200148 await lock.acquire()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700149 else:
150 # lock is acquired
151 ...
152
153 """
154
155 def __init__(self, *, loop=None):
156 self._waiters = collections.deque()
157 self._locked = False
158 if loop is not None:
159 self._loop = loop
160 else:
161 self._loop = events.get_event_loop()
162
163 def __repr__(self):
164 res = super().__repr__()
165 extra = 'locked' if self._locked else 'unlocked'
166 if self._waiters:
Yury Selivanov6370f342017-12-10 18:36:12 -0500167 extra = f'{extra}, waiters:{len(self._waiters)}'
168 return f'<{res[1:-1]} [{extra}]>'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700169
170 def locked(self):
Victor Stinnerc37dd612013-12-02 14:31:16 +0100171 """Return True if lock is acquired."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700172 return self._locked
173
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200174 async def acquire(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700175 """Acquire a lock.
176
177 This method blocks until the lock is unlocked, then sets it to
178 locked and returns True.
179 """
Guido van Rossum83f5a382016-08-23 09:39:03 -0700180 if not self._locked and all(w.cancelled() for w in self._waiters):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700181 self._locked = True
182 return True
183
Yury Selivanov7661db62016-05-16 15:38:39 -0400184 fut = self._loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700185 self._waiters.append(fut)
Miss Islington (bot)2b5937e2018-02-02 15:14:38 -0800186
187 # Finally block should be called before the CancelledError
188 # handling as we don't want CancelledError to call
189 # _wake_up_first() and attempt to wake up itself.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700190 try:
Miss Islington (bot)2b5937e2018-02-02 15:14:38 -0800191 try:
192 await fut
193 finally:
194 self._waiters.remove(fut)
Mathieu Sornay894a6542017-06-09 22:17:40 +0200195 except futures.CancelledError:
196 if not self._locked:
197 self._wake_up_first()
198 raise
Miss Islington (bot)2b5937e2018-02-02 15:14:38 -0800199
200 self._locked = True
201 return True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700202
203 def release(self):
204 """Release a lock.
205
206 When the lock is locked, reset it to unlocked, and return.
207 If any other coroutines are blocked waiting for the lock to become
208 unlocked, allow exactly one of them to proceed.
209
210 When invoked on an unlocked lock, a RuntimeError is raised.
211
212 There is no return value.
213 """
214 if self._locked:
215 self._locked = False
Mathieu Sornay894a6542017-06-09 22:17:40 +0200216 self._wake_up_first()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700217 else:
218 raise RuntimeError('Lock is not acquired.')
219
Mathieu Sornay894a6542017-06-09 22:17:40 +0200220 def _wake_up_first(self):
Miss Islington (bot)2b5937e2018-02-02 15:14:38 -0800221 """Wake up the first waiter if it isn't done."""
222 try:
223 fut = next(iter(self._waiters))
224 except StopIteration:
225 return
226
227 # .done() necessarily means that a waiter will wake up later on and
228 # either take the lock, or, if it was cancelled and lock wasn't
229 # taken already, will hit this again and wake up a new waiter.
230 if not fut.done():
231 fut.set_result(True)
Mathieu Sornay894a6542017-06-09 22:17:40 +0200232
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700233
234class Event:
Guido van Rossum994bf432013-12-19 12:47:38 -0800235 """Asynchronous equivalent to threading.Event.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700236
237 Class implementing event objects. An event manages a flag that can be set
238 to true with the set() method and reset to false with the clear() method.
239 The wait() method blocks until the flag is true. The flag is initially
240 false.
241 """
242
243 def __init__(self, *, loop=None):
244 self._waiters = collections.deque()
245 self._value = False
246 if loop is not None:
247 self._loop = loop
248 else:
249 self._loop = events.get_event_loop()
250
251 def __repr__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700252 res = super().__repr__()
Guido van Rossumccea0842013-11-04 13:18:19 -0800253 extra = 'set' if self._value else 'unset'
254 if self._waiters:
Yury Selivanov6370f342017-12-10 18:36:12 -0500255 extra = f'{extra}, waiters:{len(self._waiters)}'
256 return f'<{res[1:-1]} [{extra}]>'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700257
258 def is_set(self):
Victor Stinnerc37dd612013-12-02 14:31:16 +0100259 """Return True if and only if the internal flag is true."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700260 return self._value
261
262 def set(self):
263 """Set the internal flag to true. All coroutines waiting for it to
264 become true are awakened. Coroutine that call wait() once the flag is
265 true will not block at all.
266 """
267 if not self._value:
268 self._value = True
269
270 for fut in self._waiters:
271 if not fut.done():
272 fut.set_result(True)
273
274 def clear(self):
275 """Reset the internal flag to false. Subsequently, coroutines calling
276 wait() will block until set() is called to set the internal flag
277 to true again."""
278 self._value = False
279
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200280 async def wait(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700281 """Block until the internal flag is true.
282
283 If the internal flag is true on entry, return True
284 immediately. Otherwise, block until another coroutine calls
285 set() to set the flag to true, then return True.
286 """
287 if self._value:
288 return True
289
Yury Selivanov7661db62016-05-16 15:38:39 -0400290 fut = self._loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700291 self._waiters.append(fut)
292 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200293 await fut
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700294 return True
295 finally:
296 self._waiters.remove(fut)
297
298
Yury Selivanovd08c3632015-05-13 15:15:56 -0400299class Condition(_ContextManagerMixin):
Guido van Rossum994bf432013-12-19 12:47:38 -0800300 """Asynchronous equivalent to threading.Condition.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700301
302 This class implements condition variable objects. A condition variable
303 allows one or more coroutines to wait until they are notified by another
304 coroutine.
Guido van Rossumccea0842013-11-04 13:18:19 -0800305
306 A new Lock object is created and used as the underlying lock.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700307 """
308
Andrew Svetlovf21fcd02014-07-26 17:54:34 +0300309 def __init__(self, lock=None, *, loop=None):
Guido van Rossumccea0842013-11-04 13:18:19 -0800310 if loop is not None:
311 self._loop = loop
312 else:
313 self._loop = events.get_event_loop()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700314
Andrew Svetlovf21fcd02014-07-26 17:54:34 +0300315 if lock is None:
316 lock = Lock(loop=self._loop)
317 elif lock._loop is not self._loop:
318 raise ValueError("loop argument must agree with lock")
319
Guido van Rossumccea0842013-11-04 13:18:19 -0800320 self._lock = lock
321 # Export the lock's locked(), acquire() and release() methods.
322 self.locked = lock.locked
323 self.acquire = lock.acquire
324 self.release = lock.release
325
326 self._waiters = collections.deque()
327
328 def __repr__(self):
329 res = super().__repr__()
330 extra = 'locked' if self.locked() else 'unlocked'
331 if self._waiters:
Yury Selivanov6370f342017-12-10 18:36:12 -0500332 extra = f'{extra}, waiters:{len(self._waiters)}'
333 return f'<{res[1:-1]} [{extra}]>'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700334
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200335 async def wait(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700336 """Wait until notified.
337
338 If the calling coroutine has not acquired the lock when this
339 method is called, a RuntimeError is raised.
340
341 This method releases the underlying lock, and then blocks
342 until it is awakened by a notify() or notify_all() call for
343 the same condition variable in another coroutine. Once
344 awakened, it re-acquires the lock and returns True.
345 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800346 if not self.locked():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700347 raise RuntimeError('cannot wait on un-acquired lock')
348
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700349 self.release()
350 try:
Yury Selivanov7661db62016-05-16 15:38:39 -0400351 fut = self._loop.create_future()
Guido van Rossumccea0842013-11-04 13:18:19 -0800352 self._waiters.append(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700353 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200354 await fut
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700355 return True
356 finally:
Guido van Rossumccea0842013-11-04 13:18:19 -0800357 self._waiters.remove(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700358
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700359 finally:
Yury Selivanovc92bf832016-06-11 12:00:07 -0400360 # Must reacquire lock even if wait is cancelled
Miss Islington (bot)8caee0f2018-02-14 01:47:30 -0800361 cancelled = False
Yury Selivanovc92bf832016-06-11 12:00:07 -0400362 while True:
363 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200364 await self.acquire()
Yury Selivanovc92bf832016-06-11 12:00:07 -0400365 break
366 except futures.CancelledError:
Miss Islington (bot)8caee0f2018-02-14 01:47:30 -0800367 cancelled = True
368
369 if cancelled:
370 raise futures.CancelledError
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700371
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200372 async def wait_for(self, predicate):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700373 """Wait until a predicate becomes true.
374
375 The predicate should be a callable which result will be
376 interpreted as a boolean value. The final predicate value is
377 the return value.
378 """
379 result = predicate()
380 while not result:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200381 await self.wait()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700382 result = predicate()
383 return result
384
385 def notify(self, n=1):
386 """By default, wake up one coroutine waiting on this condition, if any.
387 If the calling coroutine has not acquired the lock when this method
388 is called, a RuntimeError is raised.
389
390 This method wakes up at most n of the coroutines waiting for the
391 condition variable; it is a no-op if no coroutines are waiting.
392
393 Note: an awakened coroutine does not actually return from its
394 wait() call until it can reacquire the lock. Since notify() does
395 not release the lock, its caller should.
396 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800397 if not self.locked():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700398 raise RuntimeError('cannot notify on un-acquired lock')
399
400 idx = 0
Guido van Rossumccea0842013-11-04 13:18:19 -0800401 for fut in self._waiters:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700402 if idx >= n:
403 break
404
405 if not fut.done():
406 idx += 1
407 fut.set_result(False)
408
409 def notify_all(self):
410 """Wake up all threads waiting on this condition. This method acts
411 like notify(), but wakes up all waiting threads instead of one. If the
412 calling thread has not acquired the lock when this method is called,
413 a RuntimeError is raised.
414 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800415 self.notify(len(self._waiters))
416
Guido van Rossumccea0842013-11-04 13:18:19 -0800417
Yury Selivanovd08c3632015-05-13 15:15:56 -0400418class Semaphore(_ContextManagerMixin):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700419 """A Semaphore implementation.
420
421 A semaphore manages an internal counter which is decremented by each
422 acquire() call and incremented by each release() call. The counter
423 can never go below zero; when acquire() finds that it is zero, it blocks,
424 waiting until some other thread calls release().
425
Serhiy Storchaka14867992014-09-10 23:43:41 +0300426 Semaphores also support the context management protocol.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700427
Guido van Rossum085869b2013-11-23 15:09:16 -0800428 The optional argument gives the initial value for the internal
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700429 counter; it defaults to 1. If the value given is less than 0,
430 ValueError is raised.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700431 """
432
Guido van Rossum085869b2013-11-23 15:09:16 -0800433 def __init__(self, value=1, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700434 if value < 0:
Guido van Rossum9c55a582013-11-21 11:07:45 -0800435 raise ValueError("Semaphore initial value must be >= 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700436 self._value = value
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700437 self._waiters = collections.deque()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700438 if loop is not None:
439 self._loop = loop
440 else:
441 self._loop = events.get_event_loop()
442
443 def __repr__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700444 res = super().__repr__()
Yury Selivanov6370f342017-12-10 18:36:12 -0500445 extra = 'locked' if self.locked() else f'unlocked, value:{self._value}'
Guido van Rossumccea0842013-11-04 13:18:19 -0800446 if self._waiters:
Yury Selivanov6370f342017-12-10 18:36:12 -0500447 extra = f'{extra}, waiters:{len(self._waiters)}'
448 return f'<{res[1:-1]} [{extra}]>'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700449
Guido van Rossumd455a502015-09-29 11:54:45 -0700450 def _wake_up_next(self):
451 while self._waiters:
452 waiter = self._waiters.popleft()
453 if not waiter.done():
454 waiter.set_result(None)
455 return
456
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700457 def locked(self):
458 """Returns True if semaphore can not be acquired immediately."""
Guido van Rossumab3c8892014-01-25 16:51:57 -0800459 return self._value == 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700460
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200461 async def acquire(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700462 """Acquire a semaphore.
463
464 If the internal counter is larger than zero on entry,
465 decrement it by one and return True immediately. If it is
466 zero on entry, block, waiting until some other coroutine has
467 called release() to make it larger than 0, and then return
468 True.
469 """
Guido van Rossumd455a502015-09-29 11:54:45 -0700470 while self._value <= 0:
Yury Selivanov7661db62016-05-16 15:38:39 -0400471 fut = self._loop.create_future()
Guido van Rossumd455a502015-09-29 11:54:45 -0700472 self._waiters.append(fut)
473 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200474 await fut
Guido van Rossumd455a502015-09-29 11:54:45 -0700475 except:
476 # See the similar code in Queue.get.
477 fut.cancel()
478 if self._value > 0 and not fut.cancelled():
479 self._wake_up_next()
480 raise
481 self._value -= 1
482 return True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700483
484 def release(self):
485 """Release a semaphore, incrementing the internal counter by one.
486 When it was zero on entry and another coroutine is waiting for it to
487 become larger than zero again, wake up that coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700488 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700489 self._value += 1
Guido van Rossumd455a502015-09-29 11:54:45 -0700490 self._wake_up_next()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700491
Guido van Rossum085869b2013-11-23 15:09:16 -0800492
493class BoundedSemaphore(Semaphore):
494 """A bounded semaphore implementation.
495
496 This raises ValueError in release() if it would increase the value
497 above the initial value.
498 """
499
500 def __init__(self, value=1, *, loop=None):
501 self._bound_value = value
502 super().__init__(value, loop=loop)
503
504 def release(self):
505 if self._value >= self._bound_value:
506 raise ValueError('BoundedSemaphore released too many times')
507 super().release()