blob: 61938373509678da94cf1d4a7ceea13bc07eeff2 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Synchronization primitives."""
2
Yury Selivanov6370f342017-12-10 18:36:12 -05003__all__ = ('Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004
5import collections
Andrew Svetlov28d8d142017-12-09 20:00:05 +02006import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007
8from . import events
9from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020010from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
12
Guido van Rossumab3c8892014-01-25 16:51:57 -080013class _ContextManager:
14 """Context manager.
15
16 This enables the following idiom for acquiring and releasing a
17 lock around a block:
18
19 with (yield from lock):
20 <block>
21
22 while failing loudly when accidentally using:
23
24 with lock:
25 <block>
Andrew Svetlov88743422017-12-11 17:35:49 +020026
27 Deprecated, use 'async with' statement:
28 async with lock:
29 <block>
Guido van Rossumab3c8892014-01-25 16:51:57 -080030 """
31
32 def __init__(self, lock):
33 self._lock = lock
34
35 def __enter__(self):
36 # We have no use for the "as ..." clause in the with
37 # statement for locks.
38 return None
39
40 def __exit__(self, *args):
41 try:
42 self._lock.release()
43 finally:
44 self._lock = None # Crudely prevent reuse.
45
46
Yury Selivanovd08c3632015-05-13 15:15:56 -040047class _ContextManagerMixin:
48 def __enter__(self):
49 raise RuntimeError(
50 '"yield from" should be used as context manager expression')
51
52 def __exit__(self, *args):
53 # This must exist because __enter__ exists, even though that
54 # always raises; that's how the with-statement works.
55 pass
56
57 @coroutine
58 def __iter__(self):
59 # This is not a coroutine. It is meant to enable the idiom:
60 #
61 # with (yield from lock):
62 # <block>
63 #
64 # as an alternative to:
65 #
66 # yield from lock.acquire()
67 # try:
68 # <block>
69 # finally:
70 # lock.release()
Andrew Svetlov88743422017-12-11 17:35:49 +020071 # Deprecated, use 'async with' statement:
72 # async with lock:
73 # <block>
Andrew Svetlov28d8d142017-12-09 20:00:05 +020074 warnings.warn("'with (yield from lock)' is deprecated "
75 "use 'async with lock' instead",
76 DeprecationWarning, stacklevel=2)
Yury Selivanovd08c3632015-05-13 15:15:56 -040077 yield from self.acquire()
78 return _ContextManager(self)
79
Andrew Svetlov5f841b52017-12-09 00:23:48 +020080 async def __acquire_ctx(self):
81 await self.acquire()
Victor Stinner3f438a92017-11-28 14:43:52 +010082 return _ContextManager(self)
Yury Selivanovd08c3632015-05-13 15:15:56 -040083
Andrew Svetlov5f841b52017-12-09 00:23:48 +020084 def __await__(self):
Andrew Svetlov28d8d142017-12-09 20:00:05 +020085 warnings.warn("'with await lock' is deprecated "
86 "use 'async with lock' instead",
87 DeprecationWarning, stacklevel=2)
Andrew Svetlov5f841b52017-12-09 00:23:48 +020088 # To make "with await lock" work.
89 return self.__acquire_ctx().__await__()
90
91 async def __aenter__(self):
92 await self.acquire()
Victor Stinner3f438a92017-11-28 14:43:52 +010093 # We have no use for the "as ..." clause in the with
94 # statement for locks.
95 return None
Yury Selivanovd08c3632015-05-13 15:15:56 -040096
Andrew Svetlov5f841b52017-12-09 00:23:48 +020097 async def __aexit__(self, exc_type, exc, tb):
Victor Stinner3f438a92017-11-28 14:43:52 +010098 self.release()
Yury Selivanovd08c3632015-05-13 15:15:56 -040099
100
101class Lock(_ContextManagerMixin):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700102 """Primitive lock objects.
103
104 A primitive lock is a synchronization primitive that is not owned
105 by a particular coroutine when locked. A primitive lock is in one
106 of two states, 'locked' or 'unlocked'.
107
108 It is created in the unlocked state. It has two basic methods,
109 acquire() and release(). When the state is unlocked, acquire()
110 changes the state to locked and returns immediately. When the
111 state is locked, acquire() blocks until a call to release() in
112 another coroutine changes it to unlocked, then the acquire() call
113 resets it to locked and returns. The release() method should only
114 be called in the locked state; it changes the state to unlocked
115 and returns immediately. If an attempt is made to release an
116 unlocked lock, a RuntimeError will be raised.
117
118 When more than one coroutine is blocked in acquire() waiting for
119 the state to turn to unlocked, only one coroutine proceeds when a
120 release() call resets the state to unlocked; first coroutine which
121 is blocked in acquire() is being processed.
122
Andrew Svetlov88743422017-12-11 17:35:49 +0200123 acquire() is a coroutine and should be called with 'await'.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700124
Andrew Svetlov88743422017-12-11 17:35:49 +0200125 Locks also support the asynchronous context management protocol.
126 'async with lock' statement should be used.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700127
128 Usage:
129
130 lock = Lock()
131 ...
Andrew Svetlov88743422017-12-11 17:35:49 +0200132 await lock.acquire()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700133 try:
134 ...
135 finally:
136 lock.release()
137
138 Context manager usage:
139
140 lock = Lock()
141 ...
Andrew Svetlov88743422017-12-11 17:35:49 +0200142 async with lock:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700143 ...
144
145 Lock objects can be tested for locking state:
146
147 if not lock.locked():
Andrew Svetlov88743422017-12-11 17:35:49 +0200148 await lock.acquire()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700149 else:
150 # lock is acquired
151 ...
152
153 """
154
155 def __init__(self, *, loop=None):
156 self._waiters = collections.deque()
157 self._locked = False
158 if loop is not None:
159 self._loop = loop
160 else:
161 self._loop = events.get_event_loop()
162
163 def __repr__(self):
164 res = super().__repr__()
165 extra = 'locked' if self._locked else 'unlocked'
166 if self._waiters:
Yury Selivanov6370f342017-12-10 18:36:12 -0500167 extra = f'{extra}, waiters:{len(self._waiters)}'
168 return f'<{res[1:-1]} [{extra}]>'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700169
170 def locked(self):
Victor Stinnerc37dd612013-12-02 14:31:16 +0100171 """Return True if lock is acquired."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700172 return self._locked
173
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200174 async def acquire(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700175 """Acquire a lock.
176
177 This method blocks until the lock is unlocked, then sets it to
178 locked and returns True.
179 """
Guido van Rossum83f5a382016-08-23 09:39:03 -0700180 if not self._locked and all(w.cancelled() for w in self._waiters):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700181 self._locked = True
182 return True
183
Yury Selivanov7661db62016-05-16 15:38:39 -0400184 fut = self._loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700185 self._waiters.append(fut)
186 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200187 await fut
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700188 self._locked = True
189 return True
Mathieu Sornay894a6542017-06-09 22:17:40 +0200190 except futures.CancelledError:
191 if not self._locked:
192 self._wake_up_first()
193 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700194 finally:
195 self._waiters.remove(fut)
196
197 def release(self):
198 """Release a lock.
199
200 When the lock is locked, reset it to unlocked, and return.
201 If any other coroutines are blocked waiting for the lock to become
202 unlocked, allow exactly one of them to proceed.
203
204 When invoked on an unlocked lock, a RuntimeError is raised.
205
206 There is no return value.
207 """
208 if self._locked:
209 self._locked = False
Mathieu Sornay894a6542017-06-09 22:17:40 +0200210 self._wake_up_first()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700211 else:
212 raise RuntimeError('Lock is not acquired.')
213
Mathieu Sornay894a6542017-06-09 22:17:40 +0200214 def _wake_up_first(self):
215 """Wake up the first waiter who isn't cancelled."""
216 for fut in self._waiters:
217 if not fut.done():
218 fut.set_result(True)
219 break
220
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700221
222class Event:
Guido van Rossum994bf432013-12-19 12:47:38 -0800223 """Asynchronous equivalent to threading.Event.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700224
225 Class implementing event objects. An event manages a flag that can be set
226 to true with the set() method and reset to false with the clear() method.
227 The wait() method blocks until the flag is true. The flag is initially
228 false.
229 """
230
231 def __init__(self, *, loop=None):
232 self._waiters = collections.deque()
233 self._value = False
234 if loop is not None:
235 self._loop = loop
236 else:
237 self._loop = events.get_event_loop()
238
239 def __repr__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700240 res = super().__repr__()
Guido van Rossumccea0842013-11-04 13:18:19 -0800241 extra = 'set' if self._value else 'unset'
242 if self._waiters:
Yury Selivanov6370f342017-12-10 18:36:12 -0500243 extra = f'{extra}, waiters:{len(self._waiters)}'
244 return f'<{res[1:-1]} [{extra}]>'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700245
246 def is_set(self):
Victor Stinnerc37dd612013-12-02 14:31:16 +0100247 """Return True if and only if the internal flag is true."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700248 return self._value
249
250 def set(self):
251 """Set the internal flag to true. All coroutines waiting for it to
252 become true are awakened. Coroutine that call wait() once the flag is
253 true will not block at all.
254 """
255 if not self._value:
256 self._value = True
257
258 for fut in self._waiters:
259 if not fut.done():
260 fut.set_result(True)
261
262 def clear(self):
263 """Reset the internal flag to false. Subsequently, coroutines calling
264 wait() will block until set() is called to set the internal flag
265 to true again."""
266 self._value = False
267
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200268 async def wait(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700269 """Block until the internal flag is true.
270
271 If the internal flag is true on entry, return True
272 immediately. Otherwise, block until another coroutine calls
273 set() to set the flag to true, then return True.
274 """
275 if self._value:
276 return True
277
Yury Selivanov7661db62016-05-16 15:38:39 -0400278 fut = self._loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700279 self._waiters.append(fut)
280 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200281 await fut
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700282 return True
283 finally:
284 self._waiters.remove(fut)
285
286
Yury Selivanovd08c3632015-05-13 15:15:56 -0400287class Condition(_ContextManagerMixin):
Guido van Rossum994bf432013-12-19 12:47:38 -0800288 """Asynchronous equivalent to threading.Condition.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700289
290 This class implements condition variable objects. A condition variable
291 allows one or more coroutines to wait until they are notified by another
292 coroutine.
Guido van Rossumccea0842013-11-04 13:18:19 -0800293
294 A new Lock object is created and used as the underlying lock.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700295 """
296
Andrew Svetlovf21fcd02014-07-26 17:54:34 +0300297 def __init__(self, lock=None, *, loop=None):
Guido van Rossumccea0842013-11-04 13:18:19 -0800298 if loop is not None:
299 self._loop = loop
300 else:
301 self._loop = events.get_event_loop()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700302
Andrew Svetlovf21fcd02014-07-26 17:54:34 +0300303 if lock is None:
304 lock = Lock(loop=self._loop)
305 elif lock._loop is not self._loop:
306 raise ValueError("loop argument must agree with lock")
307
Guido van Rossumccea0842013-11-04 13:18:19 -0800308 self._lock = lock
309 # Export the lock's locked(), acquire() and release() methods.
310 self.locked = lock.locked
311 self.acquire = lock.acquire
312 self.release = lock.release
313
314 self._waiters = collections.deque()
315
316 def __repr__(self):
317 res = super().__repr__()
318 extra = 'locked' if self.locked() else 'unlocked'
319 if self._waiters:
Yury Selivanov6370f342017-12-10 18:36:12 -0500320 extra = f'{extra}, waiters:{len(self._waiters)}'
321 return f'<{res[1:-1]} [{extra}]>'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700322
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200323 async def wait(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700324 """Wait until notified.
325
326 If the calling coroutine has not acquired the lock when this
327 method is called, a RuntimeError is raised.
328
329 This method releases the underlying lock, and then blocks
330 until it is awakened by a notify() or notify_all() call for
331 the same condition variable in another coroutine. Once
332 awakened, it re-acquires the lock and returns True.
333 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800334 if not self.locked():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700335 raise RuntimeError('cannot wait on un-acquired lock')
336
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700337 self.release()
338 try:
Yury Selivanov7661db62016-05-16 15:38:39 -0400339 fut = self._loop.create_future()
Guido van Rossumccea0842013-11-04 13:18:19 -0800340 self._waiters.append(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700341 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200342 await fut
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700343 return True
344 finally:
Guido van Rossumccea0842013-11-04 13:18:19 -0800345 self._waiters.remove(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700346
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700347 finally:
Yury Selivanovc92bf832016-06-11 12:00:07 -0400348 # Must reacquire lock even if wait is cancelled
349 while True:
350 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200351 await self.acquire()
Yury Selivanovc92bf832016-06-11 12:00:07 -0400352 break
353 except futures.CancelledError:
354 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700355
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200356 async def wait_for(self, predicate):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700357 """Wait until a predicate becomes true.
358
359 The predicate should be a callable which result will be
360 interpreted as a boolean value. The final predicate value is
361 the return value.
362 """
363 result = predicate()
364 while not result:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200365 await self.wait()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700366 result = predicate()
367 return result
368
369 def notify(self, n=1):
370 """By default, wake up one coroutine waiting on this condition, if any.
371 If the calling coroutine has not acquired the lock when this method
372 is called, a RuntimeError is raised.
373
374 This method wakes up at most n of the coroutines waiting for the
375 condition variable; it is a no-op if no coroutines are waiting.
376
377 Note: an awakened coroutine does not actually return from its
378 wait() call until it can reacquire the lock. Since notify() does
379 not release the lock, its caller should.
380 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800381 if not self.locked():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700382 raise RuntimeError('cannot notify on un-acquired lock')
383
384 idx = 0
Guido van Rossumccea0842013-11-04 13:18:19 -0800385 for fut in self._waiters:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700386 if idx >= n:
387 break
388
389 if not fut.done():
390 idx += 1
391 fut.set_result(False)
392
393 def notify_all(self):
394 """Wake up all threads waiting on this condition. This method acts
395 like notify(), but wakes up all waiting threads instead of one. If the
396 calling thread has not acquired the lock when this method is called,
397 a RuntimeError is raised.
398 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800399 self.notify(len(self._waiters))
400
Guido van Rossumccea0842013-11-04 13:18:19 -0800401
Yury Selivanovd08c3632015-05-13 15:15:56 -0400402class Semaphore(_ContextManagerMixin):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700403 """A Semaphore implementation.
404
405 A semaphore manages an internal counter which is decremented by each
406 acquire() call and incremented by each release() call. The counter
407 can never go below zero; when acquire() finds that it is zero, it blocks,
408 waiting until some other thread calls release().
409
Serhiy Storchaka14867992014-09-10 23:43:41 +0300410 Semaphores also support the context management protocol.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700411
Guido van Rossum085869b2013-11-23 15:09:16 -0800412 The optional argument gives the initial value for the internal
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700413 counter; it defaults to 1. If the value given is less than 0,
414 ValueError is raised.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700415 """
416
Guido van Rossum085869b2013-11-23 15:09:16 -0800417 def __init__(self, value=1, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700418 if value < 0:
Guido van Rossum9c55a582013-11-21 11:07:45 -0800419 raise ValueError("Semaphore initial value must be >= 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700420 self._value = value
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700421 self._waiters = collections.deque()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700422 if loop is not None:
423 self._loop = loop
424 else:
425 self._loop = events.get_event_loop()
426
427 def __repr__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700428 res = super().__repr__()
Yury Selivanov6370f342017-12-10 18:36:12 -0500429 extra = 'locked' if self.locked() else f'unlocked, value:{self._value}'
Guido van Rossumccea0842013-11-04 13:18:19 -0800430 if self._waiters:
Yury Selivanov6370f342017-12-10 18:36:12 -0500431 extra = f'{extra}, waiters:{len(self._waiters)}'
432 return f'<{res[1:-1]} [{extra}]>'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700433
Guido van Rossumd455a502015-09-29 11:54:45 -0700434 def _wake_up_next(self):
435 while self._waiters:
436 waiter = self._waiters.popleft()
437 if not waiter.done():
438 waiter.set_result(None)
439 return
440
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700441 def locked(self):
442 """Returns True if semaphore can not be acquired immediately."""
Guido van Rossumab3c8892014-01-25 16:51:57 -0800443 return self._value == 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700444
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200445 async def acquire(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700446 """Acquire a semaphore.
447
448 If the internal counter is larger than zero on entry,
449 decrement it by one and return True immediately. If it is
450 zero on entry, block, waiting until some other coroutine has
451 called release() to make it larger than 0, and then return
452 True.
453 """
Guido van Rossumd455a502015-09-29 11:54:45 -0700454 while self._value <= 0:
Yury Selivanov7661db62016-05-16 15:38:39 -0400455 fut = self._loop.create_future()
Guido van Rossumd455a502015-09-29 11:54:45 -0700456 self._waiters.append(fut)
457 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200458 await fut
Guido van Rossumd455a502015-09-29 11:54:45 -0700459 except:
460 # See the similar code in Queue.get.
461 fut.cancel()
462 if self._value > 0 and not fut.cancelled():
463 self._wake_up_next()
464 raise
465 self._value -= 1
466 return True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700467
468 def release(self):
469 """Release a semaphore, incrementing the internal counter by one.
470 When it was zero on entry and another coroutine is waiting for it to
471 become larger than zero again, wake up that coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700472 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700473 self._value += 1
Guido van Rossumd455a502015-09-29 11:54:45 -0700474 self._wake_up_next()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700475
Guido van Rossum085869b2013-11-23 15:09:16 -0800476
477class BoundedSemaphore(Semaphore):
478 """A bounded semaphore implementation.
479
480 This raises ValueError in release() if it would increase the value
481 above the initial value.
482 """
483
484 def __init__(self, value=1, *, loop=None):
485 self._bound_value = value
486 super().__init__(value, loop=loop)
487
488 def release(self):
489 if self._value >= self._bound_value:
490 raise ValueError('BoundedSemaphore released too many times')
491 super().release()