blob: 750c43591791e8beb0422c856f1bf0a45af552d4 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Synchronization primitives."""
2
Guido van Rossuma58d1c32013-11-24 22:32:09 -08003__all__ = ['Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004
5import collections
6
7from . import events
8from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +02009from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070010
11
Guido van Rossumab3c8892014-01-25 16:51:57 -080012class _ContextManager:
13 """Context manager.
14
15 This enables the following idiom for acquiring and releasing a
16 lock around a block:
17
18 with (yield from lock):
19 <block>
20
21 while failing loudly when accidentally using:
22
23 with lock:
24 <block>
25 """
26
27 def __init__(self, lock):
28 self._lock = lock
29
30 def __enter__(self):
31 # We have no use for the "as ..." clause in the with
32 # statement for locks.
33 return None
34
35 def __exit__(self, *args):
36 try:
37 self._lock.release()
38 finally:
39 self._lock = None # Crudely prevent reuse.
40
41
Yury Selivanovd08c3632015-05-13 15:15:56 -040042class _ContextManagerMixin:
43 def __enter__(self):
44 raise RuntimeError(
45 '"yield from" should be used as context manager expression')
46
47 def __exit__(self, *args):
48 # This must exist because __enter__ exists, even though that
49 # always raises; that's how the with-statement works.
50 pass
51
52 @coroutine
53 def __iter__(self):
54 # This is not a coroutine. It is meant to enable the idiom:
55 #
56 # with (yield from lock):
57 # <block>
58 #
59 # as an alternative to:
60 #
61 # yield from lock.acquire()
62 # try:
63 # <block>
64 # finally:
65 # lock.release()
66 yield from self.acquire()
67 return _ContextManager(self)
68
Victor Stinner3f438a92017-11-28 14:43:52 +010069 def __await__(self):
70 # To make "with await lock" work.
71 yield from self.acquire()
72 return _ContextManager(self)
Yury Selivanovd08c3632015-05-13 15:15:56 -040073
Victor Stinner3f438a92017-11-28 14:43:52 +010074 @coroutine
75 def __aenter__(self):
76 yield from self.acquire()
77 # We have no use for the "as ..." clause in the with
78 # statement for locks.
79 return None
Yury Selivanovd08c3632015-05-13 15:15:56 -040080
Victor Stinner3f438a92017-11-28 14:43:52 +010081 @coroutine
82 def __aexit__(self, exc_type, exc, tb):
83 self.release()
Yury Selivanovd08c3632015-05-13 15:15:56 -040084
85
86class Lock(_ContextManagerMixin):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070087 """Primitive lock objects.
88
89 A primitive lock is a synchronization primitive that is not owned
90 by a particular coroutine when locked. A primitive lock is in one
91 of two states, 'locked' or 'unlocked'.
92
93 It is created in the unlocked state. It has two basic methods,
94 acquire() and release(). When the state is unlocked, acquire()
95 changes the state to locked and returns immediately. When the
96 state is locked, acquire() blocks until a call to release() in
97 another coroutine changes it to unlocked, then the acquire() call
98 resets it to locked and returns. The release() method should only
99 be called in the locked state; it changes the state to unlocked
100 and returns immediately. If an attempt is made to release an
101 unlocked lock, a RuntimeError will be raised.
102
103 When more than one coroutine is blocked in acquire() waiting for
104 the state to turn to unlocked, only one coroutine proceeds when a
105 release() call resets the state to unlocked; first coroutine which
106 is blocked in acquire() is being processed.
107
108 acquire() is a coroutine and should be called with 'yield from'.
109
Serhiy Storchaka14867992014-09-10 23:43:41 +0300110 Locks also support the context management protocol. '(yield from lock)'
Martin Panter3ee62702016-06-04 04:57:19 +0000111 should be used as the context manager expression.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700112
113 Usage:
114
115 lock = Lock()
116 ...
117 yield from lock
118 try:
119 ...
120 finally:
121 lock.release()
122
123 Context manager usage:
124
125 lock = Lock()
126 ...
127 with (yield from lock):
128 ...
129
130 Lock objects can be tested for locking state:
131
132 if not lock.locked():
133 yield from lock
134 else:
135 # lock is acquired
136 ...
137
138 """
139
140 def __init__(self, *, loop=None):
141 self._waiters = collections.deque()
142 self._locked = False
143 if loop is not None:
144 self._loop = loop
145 else:
146 self._loop = events.get_event_loop()
147
148 def __repr__(self):
149 res = super().__repr__()
150 extra = 'locked' if self._locked else 'unlocked'
151 if self._waiters:
152 extra = '{},waiters:{}'.format(extra, len(self._waiters))
153 return '<{} [{}]>'.format(res[1:-1], extra)
154
155 def locked(self):
Victor Stinnerc37dd612013-12-02 14:31:16 +0100156 """Return True if lock is acquired."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700157 return self._locked
158
Victor Stinnerf951d282014-06-29 00:46:45 +0200159 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700160 def acquire(self):
161 """Acquire a lock.
162
163 This method blocks until the lock is unlocked, then sets it to
164 locked and returns True.
165 """
Guido van Rossum83f5a382016-08-23 09:39:03 -0700166 if not self._locked and all(w.cancelled() for w in self._waiters):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700167 self._locked = True
168 return True
169
Yury Selivanov7661db62016-05-16 15:38:39 -0400170 fut = self._loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700171 self._waiters.append(fut)
172 try:
173 yield from fut
174 self._locked = True
175 return True
Mathieu Sornay894a6542017-06-09 22:17:40 +0200176 except futures.CancelledError:
177 if not self._locked:
178 self._wake_up_first()
179 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700180 finally:
181 self._waiters.remove(fut)
182
183 def release(self):
184 """Release a lock.
185
186 When the lock is locked, reset it to unlocked, and return.
187 If any other coroutines are blocked waiting for the lock to become
188 unlocked, allow exactly one of them to proceed.
189
190 When invoked on an unlocked lock, a RuntimeError is raised.
191
192 There is no return value.
193 """
194 if self._locked:
195 self._locked = False
Mathieu Sornay894a6542017-06-09 22:17:40 +0200196 self._wake_up_first()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700197 else:
198 raise RuntimeError('Lock is not acquired.')
199
Mathieu Sornay894a6542017-06-09 22:17:40 +0200200 def _wake_up_first(self):
201 """Wake up the first waiter who isn't cancelled."""
202 for fut in self._waiters:
203 if not fut.done():
204 fut.set_result(True)
205 break
206
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700207
208class Event:
Guido van Rossum994bf432013-12-19 12:47:38 -0800209 """Asynchronous equivalent to threading.Event.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700210
211 Class implementing event objects. An event manages a flag that can be set
212 to true with the set() method and reset to false with the clear() method.
213 The wait() method blocks until the flag is true. The flag is initially
214 false.
215 """
216
217 def __init__(self, *, loop=None):
218 self._waiters = collections.deque()
219 self._value = False
220 if loop is not None:
221 self._loop = loop
222 else:
223 self._loop = events.get_event_loop()
224
225 def __repr__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700226 res = super().__repr__()
Guido van Rossumccea0842013-11-04 13:18:19 -0800227 extra = 'set' if self._value else 'unset'
228 if self._waiters:
229 extra = '{},waiters:{}'.format(extra, len(self._waiters))
230 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700231
232 def is_set(self):
Victor Stinnerc37dd612013-12-02 14:31:16 +0100233 """Return True if and only if the internal flag is true."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700234 return self._value
235
236 def set(self):
237 """Set the internal flag to true. All coroutines waiting for it to
238 become true are awakened. Coroutine that call wait() once the flag is
239 true will not block at all.
240 """
241 if not self._value:
242 self._value = True
243
244 for fut in self._waiters:
245 if not fut.done():
246 fut.set_result(True)
247
248 def clear(self):
249 """Reset the internal flag to false. Subsequently, coroutines calling
250 wait() will block until set() is called to set the internal flag
251 to true again."""
252 self._value = False
253
Victor Stinnerf951d282014-06-29 00:46:45 +0200254 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700255 def wait(self):
256 """Block until the internal flag is true.
257
258 If the internal flag is true on entry, return True
259 immediately. Otherwise, block until another coroutine calls
260 set() to set the flag to true, then return True.
261 """
262 if self._value:
263 return True
264
Yury Selivanov7661db62016-05-16 15:38:39 -0400265 fut = self._loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700266 self._waiters.append(fut)
267 try:
268 yield from fut
269 return True
270 finally:
271 self._waiters.remove(fut)
272
273
Yury Selivanovd08c3632015-05-13 15:15:56 -0400274class Condition(_ContextManagerMixin):
Guido van Rossum994bf432013-12-19 12:47:38 -0800275 """Asynchronous equivalent to threading.Condition.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700276
277 This class implements condition variable objects. A condition variable
278 allows one or more coroutines to wait until they are notified by another
279 coroutine.
Guido van Rossumccea0842013-11-04 13:18:19 -0800280
281 A new Lock object is created and used as the underlying lock.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700282 """
283
Andrew Svetlovf21fcd02014-07-26 17:54:34 +0300284 def __init__(self, lock=None, *, loop=None):
Guido van Rossumccea0842013-11-04 13:18:19 -0800285 if loop is not None:
286 self._loop = loop
287 else:
288 self._loop = events.get_event_loop()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700289
Andrew Svetlovf21fcd02014-07-26 17:54:34 +0300290 if lock is None:
291 lock = Lock(loop=self._loop)
292 elif lock._loop is not self._loop:
293 raise ValueError("loop argument must agree with lock")
294
Guido van Rossumccea0842013-11-04 13:18:19 -0800295 self._lock = lock
296 # Export the lock's locked(), acquire() and release() methods.
297 self.locked = lock.locked
298 self.acquire = lock.acquire
299 self.release = lock.release
300
301 self._waiters = collections.deque()
302
303 def __repr__(self):
304 res = super().__repr__()
305 extra = 'locked' if self.locked() else 'unlocked'
306 if self._waiters:
307 extra = '{},waiters:{}'.format(extra, len(self._waiters))
308 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700309
Victor Stinnerf951d282014-06-29 00:46:45 +0200310 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700311 def wait(self):
312 """Wait until notified.
313
314 If the calling coroutine has not acquired the lock when this
315 method is called, a RuntimeError is raised.
316
317 This method releases the underlying lock, and then blocks
318 until it is awakened by a notify() or notify_all() call for
319 the same condition variable in another coroutine. Once
320 awakened, it re-acquires the lock and returns True.
321 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800322 if not self.locked():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700323 raise RuntimeError('cannot wait on un-acquired lock')
324
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700325 self.release()
326 try:
Yury Selivanov7661db62016-05-16 15:38:39 -0400327 fut = self._loop.create_future()
Guido van Rossumccea0842013-11-04 13:18:19 -0800328 self._waiters.append(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700329 try:
330 yield from fut
331 return True
332 finally:
Guido van Rossumccea0842013-11-04 13:18:19 -0800333 self._waiters.remove(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700334
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700335 finally:
Yury Selivanovc92bf832016-06-11 12:00:07 -0400336 # Must reacquire lock even if wait is cancelled
337 while True:
338 try:
339 yield from self.acquire()
340 break
341 except futures.CancelledError:
342 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700343
Victor Stinnerf951d282014-06-29 00:46:45 +0200344 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700345 def wait_for(self, predicate):
346 """Wait until a predicate becomes true.
347
348 The predicate should be a callable which result will be
349 interpreted as a boolean value. The final predicate value is
350 the return value.
351 """
352 result = predicate()
353 while not result:
354 yield from self.wait()
355 result = predicate()
356 return result
357
358 def notify(self, n=1):
359 """By default, wake up one coroutine waiting on this condition, if any.
360 If the calling coroutine has not acquired the lock when this method
361 is called, a RuntimeError is raised.
362
363 This method wakes up at most n of the coroutines waiting for the
364 condition variable; it is a no-op if no coroutines are waiting.
365
366 Note: an awakened coroutine does not actually return from its
367 wait() call until it can reacquire the lock. Since notify() does
368 not release the lock, its caller should.
369 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800370 if not self.locked():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700371 raise RuntimeError('cannot notify on un-acquired lock')
372
373 idx = 0
Guido van Rossumccea0842013-11-04 13:18:19 -0800374 for fut in self._waiters:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700375 if idx >= n:
376 break
377
378 if not fut.done():
379 idx += 1
380 fut.set_result(False)
381
382 def notify_all(self):
383 """Wake up all threads waiting on this condition. This method acts
384 like notify(), but wakes up all waiting threads instead of one. If the
385 calling thread has not acquired the lock when this method is called,
386 a RuntimeError is raised.
387 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800388 self.notify(len(self._waiters))
389
Guido van Rossumccea0842013-11-04 13:18:19 -0800390
Yury Selivanovd08c3632015-05-13 15:15:56 -0400391class Semaphore(_ContextManagerMixin):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700392 """A Semaphore implementation.
393
394 A semaphore manages an internal counter which is decremented by each
395 acquire() call and incremented by each release() call. The counter
396 can never go below zero; when acquire() finds that it is zero, it blocks,
397 waiting until some other thread calls release().
398
Serhiy Storchaka14867992014-09-10 23:43:41 +0300399 Semaphores also support the context management protocol.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700400
Guido van Rossum085869b2013-11-23 15:09:16 -0800401 The optional argument gives the initial value for the internal
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700402 counter; it defaults to 1. If the value given is less than 0,
403 ValueError is raised.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700404 """
405
Guido van Rossum085869b2013-11-23 15:09:16 -0800406 def __init__(self, value=1, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700407 if value < 0:
Guido van Rossum9c55a582013-11-21 11:07:45 -0800408 raise ValueError("Semaphore initial value must be >= 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700409 self._value = value
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700410 self._waiters = collections.deque()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700411 if loop is not None:
412 self._loop = loop
413 else:
414 self._loop = events.get_event_loop()
415
416 def __repr__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700417 res = super().__repr__()
Guido van Rossumab3c8892014-01-25 16:51:57 -0800418 extra = 'locked' if self.locked() else 'unlocked,value:{}'.format(
Guido van Rossumccea0842013-11-04 13:18:19 -0800419 self._value)
420 if self._waiters:
421 extra = '{},waiters:{}'.format(extra, len(self._waiters))
422 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700423
Guido van Rossumd455a502015-09-29 11:54:45 -0700424 def _wake_up_next(self):
425 while self._waiters:
426 waiter = self._waiters.popleft()
427 if not waiter.done():
428 waiter.set_result(None)
429 return
430
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700431 def locked(self):
432 """Returns True if semaphore can not be acquired immediately."""
Guido van Rossumab3c8892014-01-25 16:51:57 -0800433 return self._value == 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700434
Victor Stinnerf951d282014-06-29 00:46:45 +0200435 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700436 def acquire(self):
437 """Acquire a semaphore.
438
439 If the internal counter is larger than zero on entry,
440 decrement it by one and return True immediately. If it is
441 zero on entry, block, waiting until some other coroutine has
442 called release() to make it larger than 0, and then return
443 True.
444 """
Guido van Rossumd455a502015-09-29 11:54:45 -0700445 while self._value <= 0:
Yury Selivanov7661db62016-05-16 15:38:39 -0400446 fut = self._loop.create_future()
Guido van Rossumd455a502015-09-29 11:54:45 -0700447 self._waiters.append(fut)
448 try:
449 yield from fut
450 except:
451 # See the similar code in Queue.get.
452 fut.cancel()
453 if self._value > 0 and not fut.cancelled():
454 self._wake_up_next()
455 raise
456 self._value -= 1
457 return True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700458
459 def release(self):
460 """Release a semaphore, incrementing the internal counter by one.
461 When it was zero on entry and another coroutine is waiting for it to
462 become larger than zero again, wake up that coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700463 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700464 self._value += 1
Guido van Rossumd455a502015-09-29 11:54:45 -0700465 self._wake_up_next()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700466
Guido van Rossum085869b2013-11-23 15:09:16 -0800467
468class BoundedSemaphore(Semaphore):
469 """A bounded semaphore implementation.
470
471 This raises ValueError in release() if it would increase the value
472 above the initial value.
473 """
474
475 def __init__(self, value=1, *, loop=None):
476 self._bound_value = value
477 super().__init__(value, loop=loop)
478
479 def release(self):
480 if self._value >= self._bound_value:
481 raise ValueError('BoundedSemaphore released too many times')
482 super().release()