blob: 57eb69e7cf082f2c9b7166e02bd5cf976be040f7 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Synchronization primitives."""
2
Guido van Rossuma58d1c32013-11-24 22:32:09 -08003__all__ = ['Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004
5import collections
Andrew Svetlov28d8d142017-12-09 20:00:05 +02006import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007
8from . import events
9from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020010from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
12
Guido van Rossumab3c8892014-01-25 16:51:57 -080013class _ContextManager:
14 """Context manager.
15
16 This enables the following idiom for acquiring and releasing a
17 lock around a block:
18
19 with (yield from lock):
20 <block>
21
22 while failing loudly when accidentally using:
23
24 with lock:
25 <block>
26 """
27
28 def __init__(self, lock):
29 self._lock = lock
30
31 def __enter__(self):
32 # We have no use for the "as ..." clause in the with
33 # statement for locks.
34 return None
35
36 def __exit__(self, *args):
37 try:
38 self._lock.release()
39 finally:
40 self._lock = None # Crudely prevent reuse.
41
42
Yury Selivanovd08c3632015-05-13 15:15:56 -040043class _ContextManagerMixin:
44 def __enter__(self):
45 raise RuntimeError(
46 '"yield from" should be used as context manager expression')
47
48 def __exit__(self, *args):
49 # This must exist because __enter__ exists, even though that
50 # always raises; that's how the with-statement works.
51 pass
52
53 @coroutine
54 def __iter__(self):
55 # This is not a coroutine. It is meant to enable the idiom:
56 #
57 # with (yield from lock):
58 # <block>
59 #
60 # as an alternative to:
61 #
62 # yield from lock.acquire()
63 # try:
64 # <block>
65 # finally:
66 # lock.release()
Andrew Svetlov28d8d142017-12-09 20:00:05 +020067 warnings.warn("'with (yield from lock)' is deprecated "
68 "use 'async with lock' instead",
69 DeprecationWarning, stacklevel=2)
Yury Selivanovd08c3632015-05-13 15:15:56 -040070 yield from self.acquire()
71 return _ContextManager(self)
72
Andrew Svetlov5f841b52017-12-09 00:23:48 +020073 async def __acquire_ctx(self):
74 await self.acquire()
Victor Stinner3f438a92017-11-28 14:43:52 +010075 return _ContextManager(self)
Yury Selivanovd08c3632015-05-13 15:15:56 -040076
Andrew Svetlov5f841b52017-12-09 00:23:48 +020077 def __await__(self):
Andrew Svetlov28d8d142017-12-09 20:00:05 +020078 warnings.warn("'with await lock' is deprecated "
79 "use 'async with lock' instead",
80 DeprecationWarning, stacklevel=2)
Andrew Svetlov5f841b52017-12-09 00:23:48 +020081 # To make "with await lock" work.
82 return self.__acquire_ctx().__await__()
83
84 async def __aenter__(self):
85 await self.acquire()
Victor Stinner3f438a92017-11-28 14:43:52 +010086 # We have no use for the "as ..." clause in the with
87 # statement for locks.
88 return None
Yury Selivanovd08c3632015-05-13 15:15:56 -040089
Andrew Svetlov5f841b52017-12-09 00:23:48 +020090 async def __aexit__(self, exc_type, exc, tb):
Victor Stinner3f438a92017-11-28 14:43:52 +010091 self.release()
Yury Selivanovd08c3632015-05-13 15:15:56 -040092
93
94class Lock(_ContextManagerMixin):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070095 """Primitive lock objects.
96
97 A primitive lock is a synchronization primitive that is not owned
98 by a particular coroutine when locked. A primitive lock is in one
99 of two states, 'locked' or 'unlocked'.
100
101 It is created in the unlocked state. It has two basic methods,
102 acquire() and release(). When the state is unlocked, acquire()
103 changes the state to locked and returns immediately. When the
104 state is locked, acquire() blocks until a call to release() in
105 another coroutine changes it to unlocked, then the acquire() call
106 resets it to locked and returns. The release() method should only
107 be called in the locked state; it changes the state to unlocked
108 and returns immediately. If an attempt is made to release an
109 unlocked lock, a RuntimeError will be raised.
110
111 When more than one coroutine is blocked in acquire() waiting for
112 the state to turn to unlocked, only one coroutine proceeds when a
113 release() call resets the state to unlocked; first coroutine which
114 is blocked in acquire() is being processed.
115
116 acquire() is a coroutine and should be called with 'yield from'.
117
Serhiy Storchaka14867992014-09-10 23:43:41 +0300118 Locks also support the context management protocol. '(yield from lock)'
Martin Panter3ee62702016-06-04 04:57:19 +0000119 should be used as the context manager expression.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700120
121 Usage:
122
123 lock = Lock()
124 ...
125 yield from lock
126 try:
127 ...
128 finally:
129 lock.release()
130
131 Context manager usage:
132
133 lock = Lock()
134 ...
135 with (yield from lock):
136 ...
137
138 Lock objects can be tested for locking state:
139
140 if not lock.locked():
141 yield from lock
142 else:
143 # lock is acquired
144 ...
145
146 """
147
148 def __init__(self, *, loop=None):
149 self._waiters = collections.deque()
150 self._locked = False
151 if loop is not None:
152 self._loop = loop
153 else:
154 self._loop = events.get_event_loop()
155
156 def __repr__(self):
157 res = super().__repr__()
158 extra = 'locked' if self._locked else 'unlocked'
159 if self._waiters:
160 extra = '{},waiters:{}'.format(extra, len(self._waiters))
161 return '<{} [{}]>'.format(res[1:-1], extra)
162
163 def locked(self):
Victor Stinnerc37dd612013-12-02 14:31:16 +0100164 """Return True if lock is acquired."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700165 return self._locked
166
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200167 async def acquire(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700168 """Acquire a lock.
169
170 This method blocks until the lock is unlocked, then sets it to
171 locked and returns True.
172 """
Guido van Rossum83f5a382016-08-23 09:39:03 -0700173 if not self._locked and all(w.cancelled() for w in self._waiters):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700174 self._locked = True
175 return True
176
Yury Selivanov7661db62016-05-16 15:38:39 -0400177 fut = self._loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700178 self._waiters.append(fut)
179 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200180 await fut
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700181 self._locked = True
182 return True
Mathieu Sornay894a6542017-06-09 22:17:40 +0200183 except futures.CancelledError:
184 if not self._locked:
185 self._wake_up_first()
186 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700187 finally:
188 self._waiters.remove(fut)
189
190 def release(self):
191 """Release a lock.
192
193 When the lock is locked, reset it to unlocked, and return.
194 If any other coroutines are blocked waiting for the lock to become
195 unlocked, allow exactly one of them to proceed.
196
197 When invoked on an unlocked lock, a RuntimeError is raised.
198
199 There is no return value.
200 """
201 if self._locked:
202 self._locked = False
Mathieu Sornay894a6542017-06-09 22:17:40 +0200203 self._wake_up_first()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700204 else:
205 raise RuntimeError('Lock is not acquired.')
206
Mathieu Sornay894a6542017-06-09 22:17:40 +0200207 def _wake_up_first(self):
208 """Wake up the first waiter who isn't cancelled."""
209 for fut in self._waiters:
210 if not fut.done():
211 fut.set_result(True)
212 break
213
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700214
215class Event:
Guido van Rossum994bf432013-12-19 12:47:38 -0800216 """Asynchronous equivalent to threading.Event.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700217
218 Class implementing event objects. An event manages a flag that can be set
219 to true with the set() method and reset to false with the clear() method.
220 The wait() method blocks until the flag is true. The flag is initially
221 false.
222 """
223
224 def __init__(self, *, loop=None):
225 self._waiters = collections.deque()
226 self._value = False
227 if loop is not None:
228 self._loop = loop
229 else:
230 self._loop = events.get_event_loop()
231
232 def __repr__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700233 res = super().__repr__()
Guido van Rossumccea0842013-11-04 13:18:19 -0800234 extra = 'set' if self._value else 'unset'
235 if self._waiters:
236 extra = '{},waiters:{}'.format(extra, len(self._waiters))
237 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700238
239 def is_set(self):
Victor Stinnerc37dd612013-12-02 14:31:16 +0100240 """Return True if and only if the internal flag is true."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700241 return self._value
242
243 def set(self):
244 """Set the internal flag to true. All coroutines waiting for it to
245 become true are awakened. Coroutine that call wait() once the flag is
246 true will not block at all.
247 """
248 if not self._value:
249 self._value = True
250
251 for fut in self._waiters:
252 if not fut.done():
253 fut.set_result(True)
254
255 def clear(self):
256 """Reset the internal flag to false. Subsequently, coroutines calling
257 wait() will block until set() is called to set the internal flag
258 to true again."""
259 self._value = False
260
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200261 async def wait(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700262 """Block until the internal flag is true.
263
264 If the internal flag is true on entry, return True
265 immediately. Otherwise, block until another coroutine calls
266 set() to set the flag to true, then return True.
267 """
268 if self._value:
269 return True
270
Yury Selivanov7661db62016-05-16 15:38:39 -0400271 fut = self._loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700272 self._waiters.append(fut)
273 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200274 await fut
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700275 return True
276 finally:
277 self._waiters.remove(fut)
278
279
Yury Selivanovd08c3632015-05-13 15:15:56 -0400280class Condition(_ContextManagerMixin):
Guido van Rossum994bf432013-12-19 12:47:38 -0800281 """Asynchronous equivalent to threading.Condition.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700282
283 This class implements condition variable objects. A condition variable
284 allows one or more coroutines to wait until they are notified by another
285 coroutine.
Guido van Rossumccea0842013-11-04 13:18:19 -0800286
287 A new Lock object is created and used as the underlying lock.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700288 """
289
Andrew Svetlovf21fcd02014-07-26 17:54:34 +0300290 def __init__(self, lock=None, *, loop=None):
Guido van Rossumccea0842013-11-04 13:18:19 -0800291 if loop is not None:
292 self._loop = loop
293 else:
294 self._loop = events.get_event_loop()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700295
Andrew Svetlovf21fcd02014-07-26 17:54:34 +0300296 if lock is None:
297 lock = Lock(loop=self._loop)
298 elif lock._loop is not self._loop:
299 raise ValueError("loop argument must agree with lock")
300
Guido van Rossumccea0842013-11-04 13:18:19 -0800301 self._lock = lock
302 # Export the lock's locked(), acquire() and release() methods.
303 self.locked = lock.locked
304 self.acquire = lock.acquire
305 self.release = lock.release
306
307 self._waiters = collections.deque()
308
309 def __repr__(self):
310 res = super().__repr__()
311 extra = 'locked' if self.locked() else 'unlocked'
312 if self._waiters:
313 extra = '{},waiters:{}'.format(extra, len(self._waiters))
314 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700315
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200316 async def wait(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700317 """Wait until notified.
318
319 If the calling coroutine has not acquired the lock when this
320 method is called, a RuntimeError is raised.
321
322 This method releases the underlying lock, and then blocks
323 until it is awakened by a notify() or notify_all() call for
324 the same condition variable in another coroutine. Once
325 awakened, it re-acquires the lock and returns True.
326 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800327 if not self.locked():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700328 raise RuntimeError('cannot wait on un-acquired lock')
329
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700330 self.release()
331 try:
Yury Selivanov7661db62016-05-16 15:38:39 -0400332 fut = self._loop.create_future()
Guido van Rossumccea0842013-11-04 13:18:19 -0800333 self._waiters.append(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700334 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200335 await fut
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700336 return True
337 finally:
Guido van Rossumccea0842013-11-04 13:18:19 -0800338 self._waiters.remove(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700339
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700340 finally:
Yury Selivanovc92bf832016-06-11 12:00:07 -0400341 # Must reacquire lock even if wait is cancelled
342 while True:
343 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200344 await self.acquire()
Yury Selivanovc92bf832016-06-11 12:00:07 -0400345 break
346 except futures.CancelledError:
347 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700348
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200349 async def wait_for(self, predicate):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700350 """Wait until a predicate becomes true.
351
352 The predicate should be a callable which result will be
353 interpreted as a boolean value. The final predicate value is
354 the return value.
355 """
356 result = predicate()
357 while not result:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200358 await self.wait()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700359 result = predicate()
360 return result
361
362 def notify(self, n=1):
363 """By default, wake up one coroutine waiting on this condition, if any.
364 If the calling coroutine has not acquired the lock when this method
365 is called, a RuntimeError is raised.
366
367 This method wakes up at most n of the coroutines waiting for the
368 condition variable; it is a no-op if no coroutines are waiting.
369
370 Note: an awakened coroutine does not actually return from its
371 wait() call until it can reacquire the lock. Since notify() does
372 not release the lock, its caller should.
373 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800374 if not self.locked():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700375 raise RuntimeError('cannot notify on un-acquired lock')
376
377 idx = 0
Guido van Rossumccea0842013-11-04 13:18:19 -0800378 for fut in self._waiters:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700379 if idx >= n:
380 break
381
382 if not fut.done():
383 idx += 1
384 fut.set_result(False)
385
386 def notify_all(self):
387 """Wake up all threads waiting on this condition. This method acts
388 like notify(), but wakes up all waiting threads instead of one. If the
389 calling thread has not acquired the lock when this method is called,
390 a RuntimeError is raised.
391 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800392 self.notify(len(self._waiters))
393
Guido van Rossumccea0842013-11-04 13:18:19 -0800394
Yury Selivanovd08c3632015-05-13 15:15:56 -0400395class Semaphore(_ContextManagerMixin):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700396 """A Semaphore implementation.
397
398 A semaphore manages an internal counter which is decremented by each
399 acquire() call and incremented by each release() call. The counter
400 can never go below zero; when acquire() finds that it is zero, it blocks,
401 waiting until some other thread calls release().
402
Serhiy Storchaka14867992014-09-10 23:43:41 +0300403 Semaphores also support the context management protocol.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700404
Guido van Rossum085869b2013-11-23 15:09:16 -0800405 The optional argument gives the initial value for the internal
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700406 counter; it defaults to 1. If the value given is less than 0,
407 ValueError is raised.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700408 """
409
Guido van Rossum085869b2013-11-23 15:09:16 -0800410 def __init__(self, value=1, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700411 if value < 0:
Guido van Rossum9c55a582013-11-21 11:07:45 -0800412 raise ValueError("Semaphore initial value must be >= 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700413 self._value = value
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700414 self._waiters = collections.deque()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700415 if loop is not None:
416 self._loop = loop
417 else:
418 self._loop = events.get_event_loop()
419
420 def __repr__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700421 res = super().__repr__()
Guido van Rossumab3c8892014-01-25 16:51:57 -0800422 extra = 'locked' if self.locked() else 'unlocked,value:{}'.format(
Guido van Rossumccea0842013-11-04 13:18:19 -0800423 self._value)
424 if self._waiters:
425 extra = '{},waiters:{}'.format(extra, len(self._waiters))
426 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700427
Guido van Rossumd455a502015-09-29 11:54:45 -0700428 def _wake_up_next(self):
429 while self._waiters:
430 waiter = self._waiters.popleft()
431 if not waiter.done():
432 waiter.set_result(None)
433 return
434
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700435 def locked(self):
436 """Returns True if semaphore can not be acquired immediately."""
Guido van Rossumab3c8892014-01-25 16:51:57 -0800437 return self._value == 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700438
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200439 async def acquire(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700440 """Acquire a semaphore.
441
442 If the internal counter is larger than zero on entry,
443 decrement it by one and return True immediately. If it is
444 zero on entry, block, waiting until some other coroutine has
445 called release() to make it larger than 0, and then return
446 True.
447 """
Guido van Rossumd455a502015-09-29 11:54:45 -0700448 while self._value <= 0:
Yury Selivanov7661db62016-05-16 15:38:39 -0400449 fut = self._loop.create_future()
Guido van Rossumd455a502015-09-29 11:54:45 -0700450 self._waiters.append(fut)
451 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200452 await fut
Guido van Rossumd455a502015-09-29 11:54:45 -0700453 except:
454 # See the similar code in Queue.get.
455 fut.cancel()
456 if self._value > 0 and not fut.cancelled():
457 self._wake_up_next()
458 raise
459 self._value -= 1
460 return True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700461
462 def release(self):
463 """Release a semaphore, incrementing the internal counter by one.
464 When it was zero on entry and another coroutine is waiting for it to
465 become larger than zero again, wake up that coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700466 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700467 self._value += 1
Guido van Rossumd455a502015-09-29 11:54:45 -0700468 self._wake_up_next()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700469
Guido van Rossum085869b2013-11-23 15:09:16 -0800470
471class BoundedSemaphore(Semaphore):
472 """A bounded semaphore implementation.
473
474 This raises ValueError in release() if it would increase the value
475 above the initial value.
476 """
477
478 def __init__(self, value=1, *, loop=None):
479 self._bound_value = value
480 super().__init__(value, loop=loop)
481
482 def release(self):
483 if self._value >= self._bound_value:
484 raise ValueError('BoundedSemaphore released too many times')
485 super().release()