blob: 639bd11bd06838f9330a49c1c051380daa249925 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Synchronization primitives."""
2
Yury Selivanov6370f342017-12-10 18:36:12 -05003__all__ = ('Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004
5import collections
Andrew Svetlov28d8d142017-12-09 20:00:05 +02006import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007
8from . import events
9from . import futures
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070010from . import exceptions
Victor Stinnerf951d282014-06-29 00:46:45 +020011from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13
Guido van Rossumab3c8892014-01-25 16:51:57 -080014class _ContextManager:
15 """Context manager.
16
17 This enables the following idiom for acquiring and releasing a
18 lock around a block:
19
20 with (yield from lock):
21 <block>
22
23 while failing loudly when accidentally using:
24
25 with lock:
26 <block>
Andrew Svetlov88743422017-12-11 17:35:49 +020027
28 Deprecated, use 'async with' statement:
29 async with lock:
30 <block>
Guido van Rossumab3c8892014-01-25 16:51:57 -080031 """
32
33 def __init__(self, lock):
34 self._lock = lock
35
36 def __enter__(self):
37 # We have no use for the "as ..." clause in the with
38 # statement for locks.
39 return None
40
41 def __exit__(self, *args):
42 try:
43 self._lock.release()
44 finally:
45 self._lock = None # Crudely prevent reuse.
46
47
Yury Selivanovd08c3632015-05-13 15:15:56 -040048class _ContextManagerMixin:
49 def __enter__(self):
50 raise RuntimeError(
51 '"yield from" should be used as context manager expression')
52
53 def __exit__(self, *args):
54 # This must exist because __enter__ exists, even though that
55 # always raises; that's how the with-statement works.
56 pass
57
58 @coroutine
59 def __iter__(self):
60 # This is not a coroutine. It is meant to enable the idiom:
61 #
62 # with (yield from lock):
63 # <block>
64 #
65 # as an alternative to:
66 #
67 # yield from lock.acquire()
68 # try:
69 # <block>
70 # finally:
71 # lock.release()
Andrew Svetlov88743422017-12-11 17:35:49 +020072 # Deprecated, use 'async with' statement:
73 # async with lock:
74 # <block>
Andrew Svetlov28d8d142017-12-09 20:00:05 +020075 warnings.warn("'with (yield from lock)' is deprecated "
76 "use 'async with lock' instead",
77 DeprecationWarning, stacklevel=2)
Yury Selivanovd08c3632015-05-13 15:15:56 -040078 yield from self.acquire()
79 return _ContextManager(self)
80
Andrew Svetlov5f841b52017-12-09 00:23:48 +020081 async def __acquire_ctx(self):
82 await self.acquire()
Victor Stinner3f438a92017-11-28 14:43:52 +010083 return _ContextManager(self)
Yury Selivanovd08c3632015-05-13 15:15:56 -040084
Andrew Svetlov5f841b52017-12-09 00:23:48 +020085 def __await__(self):
Andrew Svetlov28d8d142017-12-09 20:00:05 +020086 warnings.warn("'with await lock' is deprecated "
87 "use 'async with lock' instead",
88 DeprecationWarning, stacklevel=2)
Andrew Svetlov5f841b52017-12-09 00:23:48 +020089 # To make "with await lock" work.
90 return self.__acquire_ctx().__await__()
91
92 async def __aenter__(self):
93 await self.acquire()
Victor Stinner3f438a92017-11-28 14:43:52 +010094 # We have no use for the "as ..." clause in the with
95 # statement for locks.
96 return None
Yury Selivanovd08c3632015-05-13 15:15:56 -040097
Andrew Svetlov5f841b52017-12-09 00:23:48 +020098 async def __aexit__(self, exc_type, exc, tb):
Victor Stinner3f438a92017-11-28 14:43:52 +010099 self.release()
Yury Selivanovd08c3632015-05-13 15:15:56 -0400100
101
102class Lock(_ContextManagerMixin):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700103 """Primitive lock objects.
104
105 A primitive lock is a synchronization primitive that is not owned
106 by a particular coroutine when locked. A primitive lock is in one
107 of two states, 'locked' or 'unlocked'.
108
109 It is created in the unlocked state. It has two basic methods,
110 acquire() and release(). When the state is unlocked, acquire()
111 changes the state to locked and returns immediately. When the
112 state is locked, acquire() blocks until a call to release() in
113 another coroutine changes it to unlocked, then the acquire() call
114 resets it to locked and returns. The release() method should only
115 be called in the locked state; it changes the state to unlocked
116 and returns immediately. If an attempt is made to release an
117 unlocked lock, a RuntimeError will be raised.
118
119 When more than one coroutine is blocked in acquire() waiting for
120 the state to turn to unlocked, only one coroutine proceeds when a
121 release() call resets the state to unlocked; first coroutine which
122 is blocked in acquire() is being processed.
123
Andrew Svetlov88743422017-12-11 17:35:49 +0200124 acquire() is a coroutine and should be called with 'await'.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700125
Andrew Svetlov88743422017-12-11 17:35:49 +0200126 Locks also support the asynchronous context management protocol.
127 'async with lock' statement should be used.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700128
129 Usage:
130
131 lock = Lock()
132 ...
Andrew Svetlov88743422017-12-11 17:35:49 +0200133 await lock.acquire()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700134 try:
135 ...
136 finally:
137 lock.release()
138
139 Context manager usage:
140
141 lock = Lock()
142 ...
Andrew Svetlov88743422017-12-11 17:35:49 +0200143 async with lock:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700144 ...
145
146 Lock objects can be tested for locking state:
147
148 if not lock.locked():
Andrew Svetlov88743422017-12-11 17:35:49 +0200149 await lock.acquire()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700150 else:
151 # lock is acquired
152 ...
153
154 """
155
156 def __init__(self, *, loop=None):
157 self._waiters = collections.deque()
158 self._locked = False
159 if loop is not None:
160 self._loop = loop
161 else:
162 self._loop = events.get_event_loop()
163
164 def __repr__(self):
165 res = super().__repr__()
166 extra = 'locked' if self._locked else 'unlocked'
167 if self._waiters:
Yury Selivanov6370f342017-12-10 18:36:12 -0500168 extra = f'{extra}, waiters:{len(self._waiters)}'
169 return f'<{res[1:-1]} [{extra}]>'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700170
171 def locked(self):
Victor Stinnerc37dd612013-12-02 14:31:16 +0100172 """Return True if lock is acquired."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700173 return self._locked
174
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200175 async def acquire(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700176 """Acquire a lock.
177
178 This method blocks until the lock is unlocked, then sets it to
179 locked and returns True.
180 """
Guido van Rossum83f5a382016-08-23 09:39:03 -0700181 if not self._locked and all(w.cancelled() for w in self._waiters):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700182 self._locked = True
183 return True
184
Yury Selivanov7661db62016-05-16 15:38:39 -0400185 fut = self._loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700186 self._waiters.append(fut)
Bar Harel2f79c012018-02-03 00:04:00 +0200187
188 # Finally block should be called before the CancelledError
189 # handling as we don't want CancelledError to call
190 # _wake_up_first() and attempt to wake up itself.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700191 try:
Bar Harel2f79c012018-02-03 00:04:00 +0200192 try:
193 await fut
194 finally:
195 self._waiters.remove(fut)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700196 except exceptions.CancelledError:
Mathieu Sornay894a6542017-06-09 22:17:40 +0200197 if not self._locked:
198 self._wake_up_first()
199 raise
Bar Harel2f79c012018-02-03 00:04:00 +0200200
201 self._locked = True
202 return True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700203
204 def release(self):
205 """Release a lock.
206
207 When the lock is locked, reset it to unlocked, and return.
208 If any other coroutines are blocked waiting for the lock to become
209 unlocked, allow exactly one of them to proceed.
210
211 When invoked on an unlocked lock, a RuntimeError is raised.
212
213 There is no return value.
214 """
215 if self._locked:
216 self._locked = False
Mathieu Sornay894a6542017-06-09 22:17:40 +0200217 self._wake_up_first()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700218 else:
219 raise RuntimeError('Lock is not acquired.')
220
Mathieu Sornay894a6542017-06-09 22:17:40 +0200221 def _wake_up_first(self):
Bar Harel2f79c012018-02-03 00:04:00 +0200222 """Wake up the first waiter if it isn't done."""
223 try:
224 fut = next(iter(self._waiters))
225 except StopIteration:
226 return
227
228 # .done() necessarily means that a waiter will wake up later on and
229 # either take the lock, or, if it was cancelled and lock wasn't
230 # taken already, will hit this again and wake up a new waiter.
231 if not fut.done():
232 fut.set_result(True)
Mathieu Sornay894a6542017-06-09 22:17:40 +0200233
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700234
235class Event:
Guido van Rossum994bf432013-12-19 12:47:38 -0800236 """Asynchronous equivalent to threading.Event.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700237
238 Class implementing event objects. An event manages a flag that can be set
239 to true with the set() method and reset to false with the clear() method.
240 The wait() method blocks until the flag is true. The flag is initially
241 false.
242 """
243
244 def __init__(self, *, loop=None):
245 self._waiters = collections.deque()
246 self._value = False
247 if loop is not None:
248 self._loop = loop
249 else:
250 self._loop = events.get_event_loop()
251
252 def __repr__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700253 res = super().__repr__()
Guido van Rossumccea0842013-11-04 13:18:19 -0800254 extra = 'set' if self._value else 'unset'
255 if self._waiters:
Yury Selivanov6370f342017-12-10 18:36:12 -0500256 extra = f'{extra}, waiters:{len(self._waiters)}'
257 return f'<{res[1:-1]} [{extra}]>'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700258
259 def is_set(self):
Victor Stinnerc37dd612013-12-02 14:31:16 +0100260 """Return True if and only if the internal flag is true."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700261 return self._value
262
263 def set(self):
264 """Set the internal flag to true. All coroutines waiting for it to
265 become true are awakened. Coroutine that call wait() once the flag is
266 true will not block at all.
267 """
268 if not self._value:
269 self._value = True
270
271 for fut in self._waiters:
272 if not fut.done():
273 fut.set_result(True)
274
275 def clear(self):
276 """Reset the internal flag to false. Subsequently, coroutines calling
277 wait() will block until set() is called to set the internal flag
278 to true again."""
279 self._value = False
280
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200281 async def wait(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700282 """Block until the internal flag is true.
283
284 If the internal flag is true on entry, return True
285 immediately. Otherwise, block until another coroutine calls
286 set() to set the flag to true, then return True.
287 """
288 if self._value:
289 return True
290
Yury Selivanov7661db62016-05-16 15:38:39 -0400291 fut = self._loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700292 self._waiters.append(fut)
293 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200294 await fut
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700295 return True
296 finally:
297 self._waiters.remove(fut)
298
299
Yury Selivanovd08c3632015-05-13 15:15:56 -0400300class Condition(_ContextManagerMixin):
Guido van Rossum994bf432013-12-19 12:47:38 -0800301 """Asynchronous equivalent to threading.Condition.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700302
303 This class implements condition variable objects. A condition variable
304 allows one or more coroutines to wait until they are notified by another
305 coroutine.
Guido van Rossumccea0842013-11-04 13:18:19 -0800306
307 A new Lock object is created and used as the underlying lock.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700308 """
309
Andrew Svetlovf21fcd02014-07-26 17:54:34 +0300310 def __init__(self, lock=None, *, loop=None):
Guido van Rossumccea0842013-11-04 13:18:19 -0800311 if loop is not None:
312 self._loop = loop
313 else:
314 self._loop = events.get_event_loop()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700315
Andrew Svetlovf21fcd02014-07-26 17:54:34 +0300316 if lock is None:
317 lock = Lock(loop=self._loop)
318 elif lock._loop is not self._loop:
319 raise ValueError("loop argument must agree with lock")
320
Guido van Rossumccea0842013-11-04 13:18:19 -0800321 self._lock = lock
322 # Export the lock's locked(), acquire() and release() methods.
323 self.locked = lock.locked
324 self.acquire = lock.acquire
325 self.release = lock.release
326
327 self._waiters = collections.deque()
328
329 def __repr__(self):
330 res = super().__repr__()
331 extra = 'locked' if self.locked() else 'unlocked'
332 if self._waiters:
Yury Selivanov6370f342017-12-10 18:36:12 -0500333 extra = f'{extra}, waiters:{len(self._waiters)}'
334 return f'<{res[1:-1]} [{extra}]>'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700335
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200336 async def wait(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700337 """Wait until notified.
338
339 If the calling coroutine has not acquired the lock when this
340 method is called, a RuntimeError is raised.
341
342 This method releases the underlying lock, and then blocks
343 until it is awakened by a notify() or notify_all() call for
344 the same condition variable in another coroutine. Once
345 awakened, it re-acquires the lock and returns True.
346 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800347 if not self.locked():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700348 raise RuntimeError('cannot wait on un-acquired lock')
349
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700350 self.release()
351 try:
Yury Selivanov7661db62016-05-16 15:38:39 -0400352 fut = self._loop.create_future()
Guido van Rossumccea0842013-11-04 13:18:19 -0800353 self._waiters.append(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700354 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200355 await fut
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700356 return True
357 finally:
Guido van Rossumccea0842013-11-04 13:18:19 -0800358 self._waiters.remove(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700359
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700360 finally:
Yury Selivanovc92bf832016-06-11 12:00:07 -0400361 # Must reacquire lock even if wait is cancelled
Bar Harel57465102018-02-14 11:18:11 +0200362 cancelled = False
Yury Selivanovc92bf832016-06-11 12:00:07 -0400363 while True:
364 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200365 await self.acquire()
Yury Selivanovc92bf832016-06-11 12:00:07 -0400366 break
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700367 except exceptions.CancelledError:
Bar Harel57465102018-02-14 11:18:11 +0200368 cancelled = True
369
370 if cancelled:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700371 raise exceptions.CancelledError
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700372
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200373 async def wait_for(self, predicate):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700374 """Wait until a predicate becomes true.
375
376 The predicate should be a callable which result will be
377 interpreted as a boolean value. The final predicate value is
378 the return value.
379 """
380 result = predicate()
381 while not result:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200382 await self.wait()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700383 result = predicate()
384 return result
385
386 def notify(self, n=1):
387 """By default, wake up one coroutine waiting on this condition, if any.
388 If the calling coroutine has not acquired the lock when this method
389 is called, a RuntimeError is raised.
390
391 This method wakes up at most n of the coroutines waiting for the
392 condition variable; it is a no-op if no coroutines are waiting.
393
394 Note: an awakened coroutine does not actually return from its
395 wait() call until it can reacquire the lock. Since notify() does
396 not release the lock, its caller should.
397 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800398 if not self.locked():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700399 raise RuntimeError('cannot notify on un-acquired lock')
400
401 idx = 0
Guido van Rossumccea0842013-11-04 13:18:19 -0800402 for fut in self._waiters:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700403 if idx >= n:
404 break
405
406 if not fut.done():
407 idx += 1
408 fut.set_result(False)
409
410 def notify_all(self):
411 """Wake up all threads waiting on this condition. This method acts
412 like notify(), but wakes up all waiting threads instead of one. If the
413 calling thread has not acquired the lock when this method is called,
414 a RuntimeError is raised.
415 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800416 self.notify(len(self._waiters))
417
Guido van Rossumccea0842013-11-04 13:18:19 -0800418
Yury Selivanovd08c3632015-05-13 15:15:56 -0400419class Semaphore(_ContextManagerMixin):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700420 """A Semaphore implementation.
421
422 A semaphore manages an internal counter which is decremented by each
423 acquire() call and incremented by each release() call. The counter
424 can never go below zero; when acquire() finds that it is zero, it blocks,
425 waiting until some other thread calls release().
426
Serhiy Storchaka14867992014-09-10 23:43:41 +0300427 Semaphores also support the context management protocol.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700428
Guido van Rossum085869b2013-11-23 15:09:16 -0800429 The optional argument gives the initial value for the internal
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700430 counter; it defaults to 1. If the value given is less than 0,
431 ValueError is raised.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700432 """
433
Guido van Rossum085869b2013-11-23 15:09:16 -0800434 def __init__(self, value=1, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700435 if value < 0:
Guido van Rossum9c55a582013-11-21 11:07:45 -0800436 raise ValueError("Semaphore initial value must be >= 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700437 self._value = value
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700438 self._waiters = collections.deque()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700439 if loop is not None:
440 self._loop = loop
441 else:
442 self._loop = events.get_event_loop()
443
444 def __repr__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700445 res = super().__repr__()
Yury Selivanov6370f342017-12-10 18:36:12 -0500446 extra = 'locked' if self.locked() else f'unlocked, value:{self._value}'
Guido van Rossumccea0842013-11-04 13:18:19 -0800447 if self._waiters:
Yury Selivanov6370f342017-12-10 18:36:12 -0500448 extra = f'{extra}, waiters:{len(self._waiters)}'
449 return f'<{res[1:-1]} [{extra}]>'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700450
Guido van Rossumd455a502015-09-29 11:54:45 -0700451 def _wake_up_next(self):
452 while self._waiters:
453 waiter = self._waiters.popleft()
454 if not waiter.done():
455 waiter.set_result(None)
456 return
457
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700458 def locked(self):
459 """Returns True if semaphore can not be acquired immediately."""
Guido van Rossumab3c8892014-01-25 16:51:57 -0800460 return self._value == 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700461
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200462 async def acquire(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700463 """Acquire a semaphore.
464
465 If the internal counter is larger than zero on entry,
466 decrement it by one and return True immediately. If it is
467 zero on entry, block, waiting until some other coroutine has
468 called release() to make it larger than 0, and then return
469 True.
470 """
Guido van Rossumd455a502015-09-29 11:54:45 -0700471 while self._value <= 0:
Yury Selivanov7661db62016-05-16 15:38:39 -0400472 fut = self._loop.create_future()
Guido van Rossumd455a502015-09-29 11:54:45 -0700473 self._waiters.append(fut)
474 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200475 await fut
Guido van Rossumd455a502015-09-29 11:54:45 -0700476 except:
477 # See the similar code in Queue.get.
478 fut.cancel()
479 if self._value > 0 and not fut.cancelled():
480 self._wake_up_next()
481 raise
482 self._value -= 1
483 return True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700484
485 def release(self):
486 """Release a semaphore, incrementing the internal counter by one.
487 When it was zero on entry and another coroutine is waiting for it to
488 become larger than zero again, wake up that coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700489 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700490 self._value += 1
Guido van Rossumd455a502015-09-29 11:54:45 -0700491 self._wake_up_next()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700492
Guido van Rossum085869b2013-11-23 15:09:16 -0800493
494class BoundedSemaphore(Semaphore):
495 """A bounded semaphore implementation.
496
497 This raises ValueError in release() if it would increase the value
498 above the initial value.
499 """
500
501 def __init__(self, value=1, *, loop=None):
502 self._bound_value = value
503 super().__init__(value, loop=loop)
504
505 def release(self):
506 if self._value >= self._bound_value:
507 raise ValueError('BoundedSemaphore released too many times')
508 super().release()