blob: deefc938ecfb01d3c0c7438dc6690d42711ffc71 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Synchronization primitives."""
2
Guido van Rossuma58d1c32013-11-24 22:32:09 -08003__all__ = ['Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004
5import collections
6
Victor Stinner71080fc2015-07-25 02:23:21 +02007from . import compat
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008from . import events
9from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020010from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
12
Guido van Rossumab3c8892014-01-25 16:51:57 -080013class _ContextManager:
14 """Context manager.
15
16 This enables the following idiom for acquiring and releasing a
17 lock around a block:
18
19 with (yield from lock):
20 <block>
21
22 while failing loudly when accidentally using:
23
24 with lock:
25 <block>
26 """
27
28 def __init__(self, lock):
29 self._lock = lock
30
31 def __enter__(self):
32 # We have no use for the "as ..." clause in the with
33 # statement for locks.
34 return None
35
36 def __exit__(self, *args):
37 try:
38 self._lock.release()
39 finally:
40 self._lock = None # Crudely prevent reuse.
41
42
Yury Selivanovd08c3632015-05-13 15:15:56 -040043class _ContextManagerMixin:
44 def __enter__(self):
45 raise RuntimeError(
46 '"yield from" should be used as context manager expression')
47
48 def __exit__(self, *args):
49 # This must exist because __enter__ exists, even though that
50 # always raises; that's how the with-statement works.
51 pass
52
53 @coroutine
54 def __iter__(self):
55 # This is not a coroutine. It is meant to enable the idiom:
56 #
57 # with (yield from lock):
58 # <block>
59 #
60 # as an alternative to:
61 #
62 # yield from lock.acquire()
63 # try:
64 # <block>
65 # finally:
66 # lock.release()
67 yield from self.acquire()
68 return _ContextManager(self)
69
Victor Stinner71080fc2015-07-25 02:23:21 +020070 if compat.PY35:
Yury Selivanovd08c3632015-05-13 15:15:56 -040071
72 def __await__(self):
73 # To make "with await lock" work.
74 yield from self.acquire()
75 return _ContextManager(self)
76
77 @coroutine
78 def __aenter__(self):
79 yield from self.acquire()
80 # We have no use for the "as ..." clause in the with
81 # statement for locks.
82 return None
83
84 @coroutine
85 def __aexit__(self, exc_type, exc, tb):
86 self.release()
87
88
89class Lock(_ContextManagerMixin):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070090 """Primitive lock objects.
91
92 A primitive lock is a synchronization primitive that is not owned
93 by a particular coroutine when locked. A primitive lock is in one
94 of two states, 'locked' or 'unlocked'.
95
96 It is created in the unlocked state. It has two basic methods,
97 acquire() and release(). When the state is unlocked, acquire()
98 changes the state to locked and returns immediately. When the
99 state is locked, acquire() blocks until a call to release() in
100 another coroutine changes it to unlocked, then the acquire() call
101 resets it to locked and returns. The release() method should only
102 be called in the locked state; it changes the state to unlocked
103 and returns immediately. If an attempt is made to release an
104 unlocked lock, a RuntimeError will be raised.
105
106 When more than one coroutine is blocked in acquire() waiting for
107 the state to turn to unlocked, only one coroutine proceeds when a
108 release() call resets the state to unlocked; first coroutine which
109 is blocked in acquire() is being processed.
110
111 acquire() is a coroutine and should be called with 'yield from'.
112
Serhiy Storchaka14867992014-09-10 23:43:41 +0300113 Locks also support the context management protocol. '(yield from lock)'
Martin Panter3ee62702016-06-04 04:57:19 +0000114 should be used as the context manager expression.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700115
116 Usage:
117
118 lock = Lock()
119 ...
120 yield from lock
121 try:
122 ...
123 finally:
124 lock.release()
125
126 Context manager usage:
127
128 lock = Lock()
129 ...
130 with (yield from lock):
131 ...
132
133 Lock objects can be tested for locking state:
134
135 if not lock.locked():
136 yield from lock
137 else:
138 # lock is acquired
139 ...
140
141 """
142
143 def __init__(self, *, loop=None):
144 self._waiters = collections.deque()
145 self._locked = False
146 if loop is not None:
147 self._loop = loop
148 else:
149 self._loop = events.get_event_loop()
150
151 def __repr__(self):
152 res = super().__repr__()
153 extra = 'locked' if self._locked else 'unlocked'
154 if self._waiters:
155 extra = '{},waiters:{}'.format(extra, len(self._waiters))
156 return '<{} [{}]>'.format(res[1:-1], extra)
157
158 def locked(self):
Victor Stinnerc37dd612013-12-02 14:31:16 +0100159 """Return True if lock is acquired."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700160 return self._locked
161
Victor Stinnerf951d282014-06-29 00:46:45 +0200162 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700163 def acquire(self):
164 """Acquire a lock.
165
166 This method blocks until the lock is unlocked, then sets it to
167 locked and returns True.
168 """
Guido van Rossum83f5a382016-08-23 09:39:03 -0700169 if not self._locked and all(w.cancelled() for w in self._waiters):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700170 self._locked = True
171 return True
172
Yury Selivanov7661db62016-05-16 15:38:39 -0400173 fut = self._loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700174 self._waiters.append(fut)
175 try:
176 yield from fut
177 self._locked = True
178 return True
179 finally:
180 self._waiters.remove(fut)
181
182 def release(self):
183 """Release a lock.
184
185 When the lock is locked, reset it to unlocked, and return.
186 If any other coroutines are blocked waiting for the lock to become
187 unlocked, allow exactly one of them to proceed.
188
189 When invoked on an unlocked lock, a RuntimeError is raised.
190
191 There is no return value.
192 """
193 if self._locked:
194 self._locked = False
195 # Wake up the first waiter who isn't cancelled.
196 for fut in self._waiters:
197 if not fut.done():
198 fut.set_result(True)
199 break
200 else:
201 raise RuntimeError('Lock is not acquired.')
202
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700203
204class Event:
Guido van Rossum994bf432013-12-19 12:47:38 -0800205 """Asynchronous equivalent to threading.Event.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700206
207 Class implementing event objects. An event manages a flag that can be set
208 to true with the set() method and reset to false with the clear() method.
209 The wait() method blocks until the flag is true. The flag is initially
210 false.
211 """
212
213 def __init__(self, *, loop=None):
214 self._waiters = collections.deque()
215 self._value = False
216 if loop is not None:
217 self._loop = loop
218 else:
219 self._loop = events.get_event_loop()
220
221 def __repr__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700222 res = super().__repr__()
Guido van Rossumccea0842013-11-04 13:18:19 -0800223 extra = 'set' if self._value else 'unset'
224 if self._waiters:
225 extra = '{},waiters:{}'.format(extra, len(self._waiters))
226 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700227
228 def is_set(self):
Victor Stinnerc37dd612013-12-02 14:31:16 +0100229 """Return True if and only if the internal flag is true."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700230 return self._value
231
232 def set(self):
233 """Set the internal flag to true. All coroutines waiting for it to
234 become true are awakened. Coroutine that call wait() once the flag is
235 true will not block at all.
236 """
237 if not self._value:
238 self._value = True
239
240 for fut in self._waiters:
241 if not fut.done():
242 fut.set_result(True)
243
244 def clear(self):
245 """Reset the internal flag to false. Subsequently, coroutines calling
246 wait() will block until set() is called to set the internal flag
247 to true again."""
248 self._value = False
249
Victor Stinnerf951d282014-06-29 00:46:45 +0200250 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700251 def wait(self):
252 """Block until the internal flag is true.
253
254 If the internal flag is true on entry, return True
255 immediately. Otherwise, block until another coroutine calls
256 set() to set the flag to true, then return True.
257 """
258 if self._value:
259 return True
260
Yury Selivanov7661db62016-05-16 15:38:39 -0400261 fut = self._loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700262 self._waiters.append(fut)
263 try:
264 yield from fut
265 return True
266 finally:
267 self._waiters.remove(fut)
268
269
Yury Selivanovd08c3632015-05-13 15:15:56 -0400270class Condition(_ContextManagerMixin):
Guido van Rossum994bf432013-12-19 12:47:38 -0800271 """Asynchronous equivalent to threading.Condition.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700272
273 This class implements condition variable objects. A condition variable
274 allows one or more coroutines to wait until they are notified by another
275 coroutine.
Guido van Rossumccea0842013-11-04 13:18:19 -0800276
277 A new Lock object is created and used as the underlying lock.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700278 """
279
Andrew Svetlovf21fcd02014-07-26 17:54:34 +0300280 def __init__(self, lock=None, *, loop=None):
Guido van Rossumccea0842013-11-04 13:18:19 -0800281 if loop is not None:
282 self._loop = loop
283 else:
284 self._loop = events.get_event_loop()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700285
Andrew Svetlovf21fcd02014-07-26 17:54:34 +0300286 if lock is None:
287 lock = Lock(loop=self._loop)
288 elif lock._loop is not self._loop:
289 raise ValueError("loop argument must agree with lock")
290
Guido van Rossumccea0842013-11-04 13:18:19 -0800291 self._lock = lock
292 # Export the lock's locked(), acquire() and release() methods.
293 self.locked = lock.locked
294 self.acquire = lock.acquire
295 self.release = lock.release
296
297 self._waiters = collections.deque()
298
299 def __repr__(self):
300 res = super().__repr__()
301 extra = 'locked' if self.locked() else 'unlocked'
302 if self._waiters:
303 extra = '{},waiters:{}'.format(extra, len(self._waiters))
304 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700305
Victor Stinnerf951d282014-06-29 00:46:45 +0200306 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700307 def wait(self):
308 """Wait until notified.
309
310 If the calling coroutine has not acquired the lock when this
311 method is called, a RuntimeError is raised.
312
313 This method releases the underlying lock, and then blocks
314 until it is awakened by a notify() or notify_all() call for
315 the same condition variable in another coroutine. Once
316 awakened, it re-acquires the lock and returns True.
317 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800318 if not self.locked():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700319 raise RuntimeError('cannot wait on un-acquired lock')
320
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700321 self.release()
322 try:
Yury Selivanov7661db62016-05-16 15:38:39 -0400323 fut = self._loop.create_future()
Guido van Rossumccea0842013-11-04 13:18:19 -0800324 self._waiters.append(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700325 try:
326 yield from fut
327 return True
328 finally:
Guido van Rossumccea0842013-11-04 13:18:19 -0800329 self._waiters.remove(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700330
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700331 finally:
Yury Selivanovc92bf832016-06-11 12:00:07 -0400332 # Must reacquire lock even if wait is cancelled
333 while True:
334 try:
335 yield from self.acquire()
336 break
337 except futures.CancelledError:
338 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700339
Victor Stinnerf951d282014-06-29 00:46:45 +0200340 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700341 def wait_for(self, predicate):
342 """Wait until a predicate becomes true.
343
344 The predicate should be a callable which result will be
345 interpreted as a boolean value. The final predicate value is
346 the return value.
347 """
348 result = predicate()
349 while not result:
350 yield from self.wait()
351 result = predicate()
352 return result
353
354 def notify(self, n=1):
355 """By default, wake up one coroutine waiting on this condition, if any.
356 If the calling coroutine has not acquired the lock when this method
357 is called, a RuntimeError is raised.
358
359 This method wakes up at most n of the coroutines waiting for the
360 condition variable; it is a no-op if no coroutines are waiting.
361
362 Note: an awakened coroutine does not actually return from its
363 wait() call until it can reacquire the lock. Since notify() does
364 not release the lock, its caller should.
365 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800366 if not self.locked():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700367 raise RuntimeError('cannot notify on un-acquired lock')
368
369 idx = 0
Guido van Rossumccea0842013-11-04 13:18:19 -0800370 for fut in self._waiters:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700371 if idx >= n:
372 break
373
374 if not fut.done():
375 idx += 1
376 fut.set_result(False)
377
378 def notify_all(self):
379 """Wake up all threads waiting on this condition. This method acts
380 like notify(), but wakes up all waiting threads instead of one. If the
381 calling thread has not acquired the lock when this method is called,
382 a RuntimeError is raised.
383 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800384 self.notify(len(self._waiters))
385
Guido van Rossumccea0842013-11-04 13:18:19 -0800386
Yury Selivanovd08c3632015-05-13 15:15:56 -0400387class Semaphore(_ContextManagerMixin):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700388 """A Semaphore implementation.
389
390 A semaphore manages an internal counter which is decremented by each
391 acquire() call and incremented by each release() call. The counter
392 can never go below zero; when acquire() finds that it is zero, it blocks,
393 waiting until some other thread calls release().
394
Serhiy Storchaka14867992014-09-10 23:43:41 +0300395 Semaphores also support the context management protocol.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700396
Guido van Rossum085869b2013-11-23 15:09:16 -0800397 The optional argument gives the initial value for the internal
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700398 counter; it defaults to 1. If the value given is less than 0,
399 ValueError is raised.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700400 """
401
Guido van Rossum085869b2013-11-23 15:09:16 -0800402 def __init__(self, value=1, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700403 if value < 0:
Guido van Rossum9c55a582013-11-21 11:07:45 -0800404 raise ValueError("Semaphore initial value must be >= 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700405 self._value = value
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700406 self._waiters = collections.deque()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700407 if loop is not None:
408 self._loop = loop
409 else:
410 self._loop = events.get_event_loop()
411
412 def __repr__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700413 res = super().__repr__()
Guido van Rossumab3c8892014-01-25 16:51:57 -0800414 extra = 'locked' if self.locked() else 'unlocked,value:{}'.format(
Guido van Rossumccea0842013-11-04 13:18:19 -0800415 self._value)
416 if self._waiters:
417 extra = '{},waiters:{}'.format(extra, len(self._waiters))
418 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700419
Guido van Rossumd455a502015-09-29 11:54:45 -0700420 def _wake_up_next(self):
421 while self._waiters:
422 waiter = self._waiters.popleft()
423 if not waiter.done():
424 waiter.set_result(None)
425 return
426
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700427 def locked(self):
428 """Returns True if semaphore can not be acquired immediately."""
Guido van Rossumab3c8892014-01-25 16:51:57 -0800429 return self._value == 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700430
Victor Stinnerf951d282014-06-29 00:46:45 +0200431 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700432 def acquire(self):
433 """Acquire a semaphore.
434
435 If the internal counter is larger than zero on entry,
436 decrement it by one and return True immediately. If it is
437 zero on entry, block, waiting until some other coroutine has
438 called release() to make it larger than 0, and then return
439 True.
440 """
Guido van Rossumd455a502015-09-29 11:54:45 -0700441 while self._value <= 0:
Yury Selivanov7661db62016-05-16 15:38:39 -0400442 fut = self._loop.create_future()
Guido van Rossumd455a502015-09-29 11:54:45 -0700443 self._waiters.append(fut)
444 try:
445 yield from fut
446 except:
447 # See the similar code in Queue.get.
448 fut.cancel()
449 if self._value > 0 and not fut.cancelled():
450 self._wake_up_next()
451 raise
452 self._value -= 1
453 return True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700454
455 def release(self):
456 """Release a semaphore, incrementing the internal counter by one.
457 When it was zero on entry and another coroutine is waiting for it to
458 become larger than zero again, wake up that coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700459 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700460 self._value += 1
Guido van Rossumd455a502015-09-29 11:54:45 -0700461 self._wake_up_next()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700462
Guido van Rossum085869b2013-11-23 15:09:16 -0800463
464class BoundedSemaphore(Semaphore):
465 """A bounded semaphore implementation.
466
467 This raises ValueError in release() if it would increase the value
468 above the initial value.
469 """
470
471 def __init__(self, value=1, *, loop=None):
472 self._bound_value = value
473 super().__init__(value, loop=loop)
474
475 def release(self):
476 if self._value >= self._bound_value:
477 raise ValueError('BoundedSemaphore released too many times')
478 super().release()