blob: aa6ed3eaea69ffdaeffd99155c106f1d57b8b3be [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Synchronization primitives."""
2
Guido van Rossuma58d1c32013-11-24 22:32:09 -08003__all__ = ['Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004
5import collections
6
7from . import events
8from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +02009from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070010
11
Guido van Rossumab3c8892014-01-25 16:51:57 -080012class _ContextManager:
13 """Context manager.
14
15 This enables the following idiom for acquiring and releasing a
16 lock around a block:
17
18 with (yield from lock):
19 <block>
20
21 while failing loudly when accidentally using:
22
23 with lock:
24 <block>
25 """
26
27 def __init__(self, lock):
28 self._lock = lock
29
30 def __enter__(self):
31 # We have no use for the "as ..." clause in the with
32 # statement for locks.
33 return None
34
35 def __exit__(self, *args):
36 try:
37 self._lock.release()
38 finally:
39 self._lock = None # Crudely prevent reuse.
40
41
Yury Selivanovd08c3632015-05-13 15:15:56 -040042class _ContextManagerMixin:
43 def __enter__(self):
44 raise RuntimeError(
45 '"yield from" should be used as context manager expression')
46
47 def __exit__(self, *args):
48 # This must exist because __enter__ exists, even though that
49 # always raises; that's how the with-statement works.
50 pass
51
52 @coroutine
53 def __iter__(self):
54 # This is not a coroutine. It is meant to enable the idiom:
55 #
56 # with (yield from lock):
57 # <block>
58 #
59 # as an alternative to:
60 #
61 # yield from lock.acquire()
62 # try:
63 # <block>
64 # finally:
65 # lock.release()
66 yield from self.acquire()
67 return _ContextManager(self)
68
Andrew Svetlov5f841b52017-12-09 00:23:48 +020069 async def __acquire_ctx(self):
70 await self.acquire()
Victor Stinner3f438a92017-11-28 14:43:52 +010071 return _ContextManager(self)
Yury Selivanovd08c3632015-05-13 15:15:56 -040072
Andrew Svetlov5f841b52017-12-09 00:23:48 +020073 def __await__(self):
74 # To make "with await lock" work.
75 return self.__acquire_ctx().__await__()
76
77 async def __aenter__(self):
78 await self.acquire()
Victor Stinner3f438a92017-11-28 14:43:52 +010079 # We have no use for the "as ..." clause in the with
80 # statement for locks.
81 return None
Yury Selivanovd08c3632015-05-13 15:15:56 -040082
Andrew Svetlov5f841b52017-12-09 00:23:48 +020083 async def __aexit__(self, exc_type, exc, tb):
Victor Stinner3f438a92017-11-28 14:43:52 +010084 self.release()
Yury Selivanovd08c3632015-05-13 15:15:56 -040085
86
87class Lock(_ContextManagerMixin):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070088 """Primitive lock objects.
89
90 A primitive lock is a synchronization primitive that is not owned
91 by a particular coroutine when locked. A primitive lock is in one
92 of two states, 'locked' or 'unlocked'.
93
94 It is created in the unlocked state. It has two basic methods,
95 acquire() and release(). When the state is unlocked, acquire()
96 changes the state to locked and returns immediately. When the
97 state is locked, acquire() blocks until a call to release() in
98 another coroutine changes it to unlocked, then the acquire() call
99 resets it to locked and returns. The release() method should only
100 be called in the locked state; it changes the state to unlocked
101 and returns immediately. If an attempt is made to release an
102 unlocked lock, a RuntimeError will be raised.
103
104 When more than one coroutine is blocked in acquire() waiting for
105 the state to turn to unlocked, only one coroutine proceeds when a
106 release() call resets the state to unlocked; first coroutine which
107 is blocked in acquire() is being processed.
108
109 acquire() is a coroutine and should be called with 'yield from'.
110
Serhiy Storchaka14867992014-09-10 23:43:41 +0300111 Locks also support the context management protocol. '(yield from lock)'
Martin Panter3ee62702016-06-04 04:57:19 +0000112 should be used as the context manager expression.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700113
114 Usage:
115
116 lock = Lock()
117 ...
118 yield from lock
119 try:
120 ...
121 finally:
122 lock.release()
123
124 Context manager usage:
125
126 lock = Lock()
127 ...
128 with (yield from lock):
129 ...
130
131 Lock objects can be tested for locking state:
132
133 if not lock.locked():
134 yield from lock
135 else:
136 # lock is acquired
137 ...
138
139 """
140
141 def __init__(self, *, loop=None):
142 self._waiters = collections.deque()
143 self._locked = False
144 if loop is not None:
145 self._loop = loop
146 else:
147 self._loop = events.get_event_loop()
148
149 def __repr__(self):
150 res = super().__repr__()
151 extra = 'locked' if self._locked else 'unlocked'
152 if self._waiters:
153 extra = '{},waiters:{}'.format(extra, len(self._waiters))
154 return '<{} [{}]>'.format(res[1:-1], extra)
155
156 def locked(self):
Victor Stinnerc37dd612013-12-02 14:31:16 +0100157 """Return True if lock is acquired."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700158 return self._locked
159
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200160 async def acquire(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700161 """Acquire a lock.
162
163 This method blocks until the lock is unlocked, then sets it to
164 locked and returns True.
165 """
Guido van Rossum83f5a382016-08-23 09:39:03 -0700166 if not self._locked and all(w.cancelled() for w in self._waiters):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700167 self._locked = True
168 return True
169
Yury Selivanov7661db62016-05-16 15:38:39 -0400170 fut = self._loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700171 self._waiters.append(fut)
172 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200173 await fut
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700174 self._locked = True
175 return True
Mathieu Sornay894a6542017-06-09 22:17:40 +0200176 except futures.CancelledError:
177 if not self._locked:
178 self._wake_up_first()
179 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700180 finally:
181 self._waiters.remove(fut)
182
183 def release(self):
184 """Release a lock.
185
186 When the lock is locked, reset it to unlocked, and return.
187 If any other coroutines are blocked waiting for the lock to become
188 unlocked, allow exactly one of them to proceed.
189
190 When invoked on an unlocked lock, a RuntimeError is raised.
191
192 There is no return value.
193 """
194 if self._locked:
195 self._locked = False
Mathieu Sornay894a6542017-06-09 22:17:40 +0200196 self._wake_up_first()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700197 else:
198 raise RuntimeError('Lock is not acquired.')
199
Mathieu Sornay894a6542017-06-09 22:17:40 +0200200 def _wake_up_first(self):
201 """Wake up the first waiter who isn't cancelled."""
202 for fut in self._waiters:
203 if not fut.done():
204 fut.set_result(True)
205 break
206
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700207
208class Event:
Guido van Rossum994bf432013-12-19 12:47:38 -0800209 """Asynchronous equivalent to threading.Event.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700210
211 Class implementing event objects. An event manages a flag that can be set
212 to true with the set() method and reset to false with the clear() method.
213 The wait() method blocks until the flag is true. The flag is initially
214 false.
215 """
216
217 def __init__(self, *, loop=None):
218 self._waiters = collections.deque()
219 self._value = False
220 if loop is not None:
221 self._loop = loop
222 else:
223 self._loop = events.get_event_loop()
224
225 def __repr__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700226 res = super().__repr__()
Guido van Rossumccea0842013-11-04 13:18:19 -0800227 extra = 'set' if self._value else 'unset'
228 if self._waiters:
229 extra = '{},waiters:{}'.format(extra, len(self._waiters))
230 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700231
232 def is_set(self):
Victor Stinnerc37dd612013-12-02 14:31:16 +0100233 """Return True if and only if the internal flag is true."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700234 return self._value
235
236 def set(self):
237 """Set the internal flag to true. All coroutines waiting for it to
238 become true are awakened. Coroutine that call wait() once the flag is
239 true will not block at all.
240 """
241 if not self._value:
242 self._value = True
243
244 for fut in self._waiters:
245 if not fut.done():
246 fut.set_result(True)
247
248 def clear(self):
249 """Reset the internal flag to false. Subsequently, coroutines calling
250 wait() will block until set() is called to set the internal flag
251 to true again."""
252 self._value = False
253
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200254 async def wait(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700255 """Block until the internal flag is true.
256
257 If the internal flag is true on entry, return True
258 immediately. Otherwise, block until another coroutine calls
259 set() to set the flag to true, then return True.
260 """
261 if self._value:
262 return True
263
Yury Selivanov7661db62016-05-16 15:38:39 -0400264 fut = self._loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700265 self._waiters.append(fut)
266 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200267 await fut
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700268 return True
269 finally:
270 self._waiters.remove(fut)
271
272
Yury Selivanovd08c3632015-05-13 15:15:56 -0400273class Condition(_ContextManagerMixin):
Guido van Rossum994bf432013-12-19 12:47:38 -0800274 """Asynchronous equivalent to threading.Condition.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700275
276 This class implements condition variable objects. A condition variable
277 allows one or more coroutines to wait until they are notified by another
278 coroutine.
Guido van Rossumccea0842013-11-04 13:18:19 -0800279
280 A new Lock object is created and used as the underlying lock.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700281 """
282
Andrew Svetlovf21fcd02014-07-26 17:54:34 +0300283 def __init__(self, lock=None, *, loop=None):
Guido van Rossumccea0842013-11-04 13:18:19 -0800284 if loop is not None:
285 self._loop = loop
286 else:
287 self._loop = events.get_event_loop()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700288
Andrew Svetlovf21fcd02014-07-26 17:54:34 +0300289 if lock is None:
290 lock = Lock(loop=self._loop)
291 elif lock._loop is not self._loop:
292 raise ValueError("loop argument must agree with lock")
293
Guido van Rossumccea0842013-11-04 13:18:19 -0800294 self._lock = lock
295 # Export the lock's locked(), acquire() and release() methods.
296 self.locked = lock.locked
297 self.acquire = lock.acquire
298 self.release = lock.release
299
300 self._waiters = collections.deque()
301
302 def __repr__(self):
303 res = super().__repr__()
304 extra = 'locked' if self.locked() else 'unlocked'
305 if self._waiters:
306 extra = '{},waiters:{}'.format(extra, len(self._waiters))
307 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700308
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200309 async def wait(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700310 """Wait until notified.
311
312 If the calling coroutine has not acquired the lock when this
313 method is called, a RuntimeError is raised.
314
315 This method releases the underlying lock, and then blocks
316 until it is awakened by a notify() or notify_all() call for
317 the same condition variable in another coroutine. Once
318 awakened, it re-acquires the lock and returns True.
319 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800320 if not self.locked():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700321 raise RuntimeError('cannot wait on un-acquired lock')
322
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700323 self.release()
324 try:
Yury Selivanov7661db62016-05-16 15:38:39 -0400325 fut = self._loop.create_future()
Guido van Rossumccea0842013-11-04 13:18:19 -0800326 self._waiters.append(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700327 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200328 await fut
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700329 return True
330 finally:
Guido van Rossumccea0842013-11-04 13:18:19 -0800331 self._waiters.remove(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700332
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700333 finally:
Yury Selivanovc92bf832016-06-11 12:00:07 -0400334 # Must reacquire lock even if wait is cancelled
335 while True:
336 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200337 await self.acquire()
Yury Selivanovc92bf832016-06-11 12:00:07 -0400338 break
339 except futures.CancelledError:
340 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700341
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200342 async def wait_for(self, predicate):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700343 """Wait until a predicate becomes true.
344
345 The predicate should be a callable which result will be
346 interpreted as a boolean value. The final predicate value is
347 the return value.
348 """
349 result = predicate()
350 while not result:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200351 await self.wait()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700352 result = predicate()
353 return result
354
355 def notify(self, n=1):
356 """By default, wake up one coroutine waiting on this condition, if any.
357 If the calling coroutine has not acquired the lock when this method
358 is called, a RuntimeError is raised.
359
360 This method wakes up at most n of the coroutines waiting for the
361 condition variable; it is a no-op if no coroutines are waiting.
362
363 Note: an awakened coroutine does not actually return from its
364 wait() call until it can reacquire the lock. Since notify() does
365 not release the lock, its caller should.
366 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800367 if not self.locked():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700368 raise RuntimeError('cannot notify on un-acquired lock')
369
370 idx = 0
Guido van Rossumccea0842013-11-04 13:18:19 -0800371 for fut in self._waiters:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700372 if idx >= n:
373 break
374
375 if not fut.done():
376 idx += 1
377 fut.set_result(False)
378
379 def notify_all(self):
380 """Wake up all threads waiting on this condition. This method acts
381 like notify(), but wakes up all waiting threads instead of one. If the
382 calling thread has not acquired the lock when this method is called,
383 a RuntimeError is raised.
384 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800385 self.notify(len(self._waiters))
386
Guido van Rossumccea0842013-11-04 13:18:19 -0800387
Yury Selivanovd08c3632015-05-13 15:15:56 -0400388class Semaphore(_ContextManagerMixin):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700389 """A Semaphore implementation.
390
391 A semaphore manages an internal counter which is decremented by each
392 acquire() call and incremented by each release() call. The counter
393 can never go below zero; when acquire() finds that it is zero, it blocks,
394 waiting until some other thread calls release().
395
Serhiy Storchaka14867992014-09-10 23:43:41 +0300396 Semaphores also support the context management protocol.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700397
Guido van Rossum085869b2013-11-23 15:09:16 -0800398 The optional argument gives the initial value for the internal
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700399 counter; it defaults to 1. If the value given is less than 0,
400 ValueError is raised.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700401 """
402
Guido van Rossum085869b2013-11-23 15:09:16 -0800403 def __init__(self, value=1, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700404 if value < 0:
Guido van Rossum9c55a582013-11-21 11:07:45 -0800405 raise ValueError("Semaphore initial value must be >= 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700406 self._value = value
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700407 self._waiters = collections.deque()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700408 if loop is not None:
409 self._loop = loop
410 else:
411 self._loop = events.get_event_loop()
412
413 def __repr__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700414 res = super().__repr__()
Guido van Rossumab3c8892014-01-25 16:51:57 -0800415 extra = 'locked' if self.locked() else 'unlocked,value:{}'.format(
Guido van Rossumccea0842013-11-04 13:18:19 -0800416 self._value)
417 if self._waiters:
418 extra = '{},waiters:{}'.format(extra, len(self._waiters))
419 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700420
Guido van Rossumd455a502015-09-29 11:54:45 -0700421 def _wake_up_next(self):
422 while self._waiters:
423 waiter = self._waiters.popleft()
424 if not waiter.done():
425 waiter.set_result(None)
426 return
427
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700428 def locked(self):
429 """Returns True if semaphore can not be acquired immediately."""
Guido van Rossumab3c8892014-01-25 16:51:57 -0800430 return self._value == 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700431
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200432 async def acquire(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700433 """Acquire a semaphore.
434
435 If the internal counter is larger than zero on entry,
436 decrement it by one and return True immediately. If it is
437 zero on entry, block, waiting until some other coroutine has
438 called release() to make it larger than 0, and then return
439 True.
440 """
Guido van Rossumd455a502015-09-29 11:54:45 -0700441 while self._value <= 0:
Yury Selivanov7661db62016-05-16 15:38:39 -0400442 fut = self._loop.create_future()
Guido van Rossumd455a502015-09-29 11:54:45 -0700443 self._waiters.append(fut)
444 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200445 await fut
Guido van Rossumd455a502015-09-29 11:54:45 -0700446 except:
447 # See the similar code in Queue.get.
448 fut.cancel()
449 if self._value > 0 and not fut.cancelled():
450 self._wake_up_next()
451 raise
452 self._value -= 1
453 return True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700454
455 def release(self):
456 """Release a semaphore, incrementing the internal counter by one.
457 When it was zero on entry and another coroutine is waiting for it to
458 become larger than zero again, wake up that coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700459 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700460 self._value += 1
Guido van Rossumd455a502015-09-29 11:54:45 -0700461 self._wake_up_next()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700462
Guido van Rossum085869b2013-11-23 15:09:16 -0800463
464class BoundedSemaphore(Semaphore):
465 """A bounded semaphore implementation.
466
467 This raises ValueError in release() if it would increase the value
468 above the initial value.
469 """
470
471 def __init__(self, value=1, *, loop=None):
472 self._bound_value = value
473 super().__init__(value, loop=loop)
474
475 def release(self):
476 if self._value >= self._bound_value:
477 raise ValueError('BoundedSemaphore released too many times')
478 super().release()