blob: 92661830a0622830da008527b6fca71314b35d36 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Synchronization primitives."""
2
Guido van Rossuma58d1c32013-11-24 22:32:09 -08003__all__ = ['Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004
5import collections
6
Victor Stinner71080fc2015-07-25 02:23:21 +02007from . import compat
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008from . import events
9from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020010from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
12
Guido van Rossumab3c8892014-01-25 16:51:57 -080013class _ContextManager:
14 """Context manager.
15
16 This enables the following idiom for acquiring and releasing a
17 lock around a block:
18
19 with (yield from lock):
20 <block>
21
22 while failing loudly when accidentally using:
23
24 with lock:
25 <block>
26 """
27
28 def __init__(self, lock):
29 self._lock = lock
30
31 def __enter__(self):
32 # We have no use for the "as ..." clause in the with
33 # statement for locks.
34 return None
35
36 def __exit__(self, *args):
37 try:
38 self._lock.release()
39 finally:
40 self._lock = None # Crudely prevent reuse.
41
42
Yury Selivanovd08c3632015-05-13 15:15:56 -040043class _ContextManagerMixin:
44 def __enter__(self):
45 raise RuntimeError(
46 '"yield from" should be used as context manager expression')
47
48 def __exit__(self, *args):
49 # This must exist because __enter__ exists, even though that
50 # always raises; that's how the with-statement works.
51 pass
52
53 @coroutine
54 def __iter__(self):
55 # This is not a coroutine. It is meant to enable the idiom:
56 #
57 # with (yield from lock):
58 # <block>
59 #
60 # as an alternative to:
61 #
62 # yield from lock.acquire()
63 # try:
64 # <block>
65 # finally:
66 # lock.release()
67 yield from self.acquire()
68 return _ContextManager(self)
69
Victor Stinner71080fc2015-07-25 02:23:21 +020070 if compat.PY35:
Yury Selivanovd08c3632015-05-13 15:15:56 -040071
72 def __await__(self):
73 # To make "with await lock" work.
74 yield from self.acquire()
75 return _ContextManager(self)
76
77 @coroutine
78 def __aenter__(self):
79 yield from self.acquire()
80 # We have no use for the "as ..." clause in the with
81 # statement for locks.
82 return None
83
84 @coroutine
85 def __aexit__(self, exc_type, exc, tb):
86 self.release()
87
88
89class Lock(_ContextManagerMixin):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070090 """Primitive lock objects.
91
92 A primitive lock is a synchronization primitive that is not owned
93 by a particular coroutine when locked. A primitive lock is in one
94 of two states, 'locked' or 'unlocked'.
95
96 It is created in the unlocked state. It has two basic methods,
97 acquire() and release(). When the state is unlocked, acquire()
98 changes the state to locked and returns immediately. When the
99 state is locked, acquire() blocks until a call to release() in
100 another coroutine changes it to unlocked, then the acquire() call
101 resets it to locked and returns. The release() method should only
102 be called in the locked state; it changes the state to unlocked
103 and returns immediately. If an attempt is made to release an
104 unlocked lock, a RuntimeError will be raised.
105
106 When more than one coroutine is blocked in acquire() waiting for
107 the state to turn to unlocked, only one coroutine proceeds when a
108 release() call resets the state to unlocked; first coroutine which
109 is blocked in acquire() is being processed.
110
111 acquire() is a coroutine and should be called with 'yield from'.
112
Serhiy Storchaka14867992014-09-10 23:43:41 +0300113 Locks also support the context management protocol. '(yield from lock)'
Martin Panter3ee62702016-06-04 04:57:19 +0000114 should be used as the context manager expression.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700115
116 Usage:
117
118 lock = Lock()
119 ...
120 yield from lock
121 try:
122 ...
123 finally:
124 lock.release()
125
126 Context manager usage:
127
128 lock = Lock()
129 ...
130 with (yield from lock):
131 ...
132
133 Lock objects can be tested for locking state:
134
135 if not lock.locked():
136 yield from lock
137 else:
138 # lock is acquired
139 ...
140
141 """
142
143 def __init__(self, *, loop=None):
144 self._waiters = collections.deque()
145 self._locked = False
146 if loop is not None:
147 self._loop = loop
148 else:
149 self._loop = events.get_event_loop()
150
151 def __repr__(self):
152 res = super().__repr__()
153 extra = 'locked' if self._locked else 'unlocked'
154 if self._waiters:
155 extra = '{},waiters:{}'.format(extra, len(self._waiters))
156 return '<{} [{}]>'.format(res[1:-1], extra)
157
158 def locked(self):
Victor Stinnerc37dd612013-12-02 14:31:16 +0100159 """Return True if lock is acquired."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700160 return self._locked
161
Victor Stinnerf951d282014-06-29 00:46:45 +0200162 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700163 def acquire(self):
164 """Acquire a lock.
165
166 This method blocks until the lock is unlocked, then sets it to
167 locked and returns True.
168 """
Guido van Rossum83f5a382016-08-23 09:39:03 -0700169 if not self._locked and all(w.cancelled() for w in self._waiters):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700170 self._locked = True
171 return True
172
Yury Selivanov7661db62016-05-16 15:38:39 -0400173 fut = self._loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700174 self._waiters.append(fut)
175 try:
176 yield from fut
177 self._locked = True
178 return True
Mathieu Sornay894a6542017-06-09 22:17:40 +0200179 except futures.CancelledError:
180 if not self._locked:
181 self._wake_up_first()
182 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700183 finally:
184 self._waiters.remove(fut)
185
186 def release(self):
187 """Release a lock.
188
189 When the lock is locked, reset it to unlocked, and return.
190 If any other coroutines are blocked waiting for the lock to become
191 unlocked, allow exactly one of them to proceed.
192
193 When invoked on an unlocked lock, a RuntimeError is raised.
194
195 There is no return value.
196 """
197 if self._locked:
198 self._locked = False
Mathieu Sornay894a6542017-06-09 22:17:40 +0200199 self._wake_up_first()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700200 else:
201 raise RuntimeError('Lock is not acquired.')
202
Mathieu Sornay894a6542017-06-09 22:17:40 +0200203 def _wake_up_first(self):
204 """Wake up the first waiter who isn't cancelled."""
205 for fut in self._waiters:
206 if not fut.done():
207 fut.set_result(True)
208 break
209
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700210
211class Event:
Guido van Rossum994bf432013-12-19 12:47:38 -0800212 """Asynchronous equivalent to threading.Event.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700213
214 Class implementing event objects. An event manages a flag that can be set
215 to true with the set() method and reset to false with the clear() method.
216 The wait() method blocks until the flag is true. The flag is initially
217 false.
218 """
219
220 def __init__(self, *, loop=None):
221 self._waiters = collections.deque()
222 self._value = False
223 if loop is not None:
224 self._loop = loop
225 else:
226 self._loop = events.get_event_loop()
227
228 def __repr__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700229 res = super().__repr__()
Guido van Rossumccea0842013-11-04 13:18:19 -0800230 extra = 'set' if self._value else 'unset'
231 if self._waiters:
232 extra = '{},waiters:{}'.format(extra, len(self._waiters))
233 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700234
235 def is_set(self):
Victor Stinnerc37dd612013-12-02 14:31:16 +0100236 """Return True if and only if the internal flag is true."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700237 return self._value
238
239 def set(self):
240 """Set the internal flag to true. All coroutines waiting for it to
241 become true are awakened. Coroutine that call wait() once the flag is
242 true will not block at all.
243 """
244 if not self._value:
245 self._value = True
246
247 for fut in self._waiters:
248 if not fut.done():
249 fut.set_result(True)
250
251 def clear(self):
252 """Reset the internal flag to false. Subsequently, coroutines calling
253 wait() will block until set() is called to set the internal flag
254 to true again."""
255 self._value = False
256
Victor Stinnerf951d282014-06-29 00:46:45 +0200257 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700258 def wait(self):
259 """Block until the internal flag is true.
260
261 If the internal flag is true on entry, return True
262 immediately. Otherwise, block until another coroutine calls
263 set() to set the flag to true, then return True.
264 """
265 if self._value:
266 return True
267
Yury Selivanov7661db62016-05-16 15:38:39 -0400268 fut = self._loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700269 self._waiters.append(fut)
270 try:
271 yield from fut
272 return True
273 finally:
274 self._waiters.remove(fut)
275
276
Yury Selivanovd08c3632015-05-13 15:15:56 -0400277class Condition(_ContextManagerMixin):
Guido van Rossum994bf432013-12-19 12:47:38 -0800278 """Asynchronous equivalent to threading.Condition.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700279
280 This class implements condition variable objects. A condition variable
281 allows one or more coroutines to wait until they are notified by another
282 coroutine.
Guido van Rossumccea0842013-11-04 13:18:19 -0800283
284 A new Lock object is created and used as the underlying lock.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700285 """
286
Andrew Svetlovf21fcd02014-07-26 17:54:34 +0300287 def __init__(self, lock=None, *, loop=None):
Guido van Rossumccea0842013-11-04 13:18:19 -0800288 if loop is not None:
289 self._loop = loop
290 else:
291 self._loop = events.get_event_loop()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700292
Andrew Svetlovf21fcd02014-07-26 17:54:34 +0300293 if lock is None:
294 lock = Lock(loop=self._loop)
295 elif lock._loop is not self._loop:
296 raise ValueError("loop argument must agree with lock")
297
Guido van Rossumccea0842013-11-04 13:18:19 -0800298 self._lock = lock
299 # Export the lock's locked(), acquire() and release() methods.
300 self.locked = lock.locked
301 self.acquire = lock.acquire
302 self.release = lock.release
303
304 self._waiters = collections.deque()
305
306 def __repr__(self):
307 res = super().__repr__()
308 extra = 'locked' if self.locked() else 'unlocked'
309 if self._waiters:
310 extra = '{},waiters:{}'.format(extra, len(self._waiters))
311 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700312
Victor Stinnerf951d282014-06-29 00:46:45 +0200313 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700314 def wait(self):
315 """Wait until notified.
316
317 If the calling coroutine has not acquired the lock when this
318 method is called, a RuntimeError is raised.
319
320 This method releases the underlying lock, and then blocks
321 until it is awakened by a notify() or notify_all() call for
322 the same condition variable in another coroutine. Once
323 awakened, it re-acquires the lock and returns True.
324 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800325 if not self.locked():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700326 raise RuntimeError('cannot wait on un-acquired lock')
327
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700328 self.release()
329 try:
Yury Selivanov7661db62016-05-16 15:38:39 -0400330 fut = self._loop.create_future()
Guido van Rossumccea0842013-11-04 13:18:19 -0800331 self._waiters.append(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700332 try:
333 yield from fut
334 return True
335 finally:
Guido van Rossumccea0842013-11-04 13:18:19 -0800336 self._waiters.remove(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700337
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700338 finally:
Yury Selivanovc92bf832016-06-11 12:00:07 -0400339 # Must reacquire lock even if wait is cancelled
340 while True:
341 try:
342 yield from self.acquire()
343 break
344 except futures.CancelledError:
345 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700346
Victor Stinnerf951d282014-06-29 00:46:45 +0200347 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700348 def wait_for(self, predicate):
349 """Wait until a predicate becomes true.
350
351 The predicate should be a callable which result will be
352 interpreted as a boolean value. The final predicate value is
353 the return value.
354 """
355 result = predicate()
356 while not result:
357 yield from self.wait()
358 result = predicate()
359 return result
360
361 def notify(self, n=1):
362 """By default, wake up one coroutine waiting on this condition, if any.
363 If the calling coroutine has not acquired the lock when this method
364 is called, a RuntimeError is raised.
365
366 This method wakes up at most n of the coroutines waiting for the
367 condition variable; it is a no-op if no coroutines are waiting.
368
369 Note: an awakened coroutine does not actually return from its
370 wait() call until it can reacquire the lock. Since notify() does
371 not release the lock, its caller should.
372 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800373 if not self.locked():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700374 raise RuntimeError('cannot notify on un-acquired lock')
375
376 idx = 0
Guido van Rossumccea0842013-11-04 13:18:19 -0800377 for fut in self._waiters:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700378 if idx >= n:
379 break
380
381 if not fut.done():
382 idx += 1
383 fut.set_result(False)
384
385 def notify_all(self):
386 """Wake up all threads waiting on this condition. This method acts
387 like notify(), but wakes up all waiting threads instead of one. If the
388 calling thread has not acquired the lock when this method is called,
389 a RuntimeError is raised.
390 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800391 self.notify(len(self._waiters))
392
Guido van Rossumccea0842013-11-04 13:18:19 -0800393
Yury Selivanovd08c3632015-05-13 15:15:56 -0400394class Semaphore(_ContextManagerMixin):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700395 """A Semaphore implementation.
396
397 A semaphore manages an internal counter which is decremented by each
398 acquire() call and incremented by each release() call. The counter
399 can never go below zero; when acquire() finds that it is zero, it blocks,
400 waiting until some other thread calls release().
401
Serhiy Storchaka14867992014-09-10 23:43:41 +0300402 Semaphores also support the context management protocol.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700403
Guido van Rossum085869b2013-11-23 15:09:16 -0800404 The optional argument gives the initial value for the internal
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700405 counter; it defaults to 1. If the value given is less than 0,
406 ValueError is raised.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700407 """
408
Guido van Rossum085869b2013-11-23 15:09:16 -0800409 def __init__(self, value=1, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700410 if value < 0:
Guido van Rossum9c55a582013-11-21 11:07:45 -0800411 raise ValueError("Semaphore initial value must be >= 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700412 self._value = value
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700413 self._waiters = collections.deque()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700414 if loop is not None:
415 self._loop = loop
416 else:
417 self._loop = events.get_event_loop()
418
419 def __repr__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700420 res = super().__repr__()
Guido van Rossumab3c8892014-01-25 16:51:57 -0800421 extra = 'locked' if self.locked() else 'unlocked,value:{}'.format(
Guido van Rossumccea0842013-11-04 13:18:19 -0800422 self._value)
423 if self._waiters:
424 extra = '{},waiters:{}'.format(extra, len(self._waiters))
425 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700426
Guido van Rossumd455a502015-09-29 11:54:45 -0700427 def _wake_up_next(self):
428 while self._waiters:
429 waiter = self._waiters.popleft()
430 if not waiter.done():
431 waiter.set_result(None)
432 return
433
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700434 def locked(self):
435 """Returns True if semaphore can not be acquired immediately."""
Guido van Rossumab3c8892014-01-25 16:51:57 -0800436 return self._value == 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700437
Victor Stinnerf951d282014-06-29 00:46:45 +0200438 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700439 def acquire(self):
440 """Acquire a semaphore.
441
442 If the internal counter is larger than zero on entry,
443 decrement it by one and return True immediately. If it is
444 zero on entry, block, waiting until some other coroutine has
445 called release() to make it larger than 0, and then return
446 True.
447 """
Guido van Rossumd455a502015-09-29 11:54:45 -0700448 while self._value <= 0:
Yury Selivanov7661db62016-05-16 15:38:39 -0400449 fut = self._loop.create_future()
Guido van Rossumd455a502015-09-29 11:54:45 -0700450 self._waiters.append(fut)
451 try:
452 yield from fut
453 except:
454 # See the similar code in Queue.get.
455 fut.cancel()
456 if self._value > 0 and not fut.cancelled():
457 self._wake_up_next()
458 raise
459 self._value -= 1
460 return True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700461
462 def release(self):
463 """Release a semaphore, incrementing the internal counter by one.
464 When it was zero on entry and another coroutine is waiting for it to
465 become larger than zero again, wake up that coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700466 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700467 self._value += 1
Guido van Rossumd455a502015-09-29 11:54:45 -0700468 self._wake_up_next()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700469
Guido van Rossum085869b2013-11-23 15:09:16 -0800470
471class BoundedSemaphore(Semaphore):
472 """A bounded semaphore implementation.
473
474 This raises ValueError in release() if it would increase the value
475 above the initial value.
476 """
477
478 def __init__(self, value=1, *, loop=None):
479 self._bound_value = value
480 super().__init__(value, loop=loop)
481
482 def release(self):
483 if self._value >= self._bound_value:
484 raise ValueError('BoundedSemaphore released too many times')
485 super().release()