blob: b2e516b5430ef9ffe0a2b114f4268cd0fae7e0bc [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Synchronization primitives."""
2
Guido van Rossuma58d1c32013-11-24 22:32:09 -08003__all__ = ['Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004
5import collections
Yury Selivanovd08c3632015-05-13 15:15:56 -04006import sys
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007
8from . import events
9from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020010from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
12
Yury Selivanovd08c3632015-05-13 15:15:56 -040013_PY35 = sys.version_info >= (3, 5)
14
15
Guido van Rossumab3c8892014-01-25 16:51:57 -080016class _ContextManager:
17 """Context manager.
18
19 This enables the following idiom for acquiring and releasing a
20 lock around a block:
21
22 with (yield from lock):
23 <block>
24
25 while failing loudly when accidentally using:
26
27 with lock:
28 <block>
29 """
30
31 def __init__(self, lock):
32 self._lock = lock
33
34 def __enter__(self):
35 # We have no use for the "as ..." clause in the with
36 # statement for locks.
37 return None
38
39 def __exit__(self, *args):
40 try:
41 self._lock.release()
42 finally:
43 self._lock = None # Crudely prevent reuse.
44
45
Yury Selivanovd08c3632015-05-13 15:15:56 -040046class _ContextManagerMixin:
47 def __enter__(self):
48 raise RuntimeError(
49 '"yield from" should be used as context manager expression')
50
51 def __exit__(self, *args):
52 # This must exist because __enter__ exists, even though that
53 # always raises; that's how the with-statement works.
54 pass
55
56 @coroutine
57 def __iter__(self):
58 # This is not a coroutine. It is meant to enable the idiom:
59 #
60 # with (yield from lock):
61 # <block>
62 #
63 # as an alternative to:
64 #
65 # yield from lock.acquire()
66 # try:
67 # <block>
68 # finally:
69 # lock.release()
70 yield from self.acquire()
71 return _ContextManager(self)
72
73 if _PY35:
74
75 def __await__(self):
76 # To make "with await lock" work.
77 yield from self.acquire()
78 return _ContextManager(self)
79
80 @coroutine
81 def __aenter__(self):
82 yield from self.acquire()
83 # We have no use for the "as ..." clause in the with
84 # statement for locks.
85 return None
86
87 @coroutine
88 def __aexit__(self, exc_type, exc, tb):
89 self.release()
90
91
92class Lock(_ContextManagerMixin):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070093 """Primitive lock objects.
94
95 A primitive lock is a synchronization primitive that is not owned
96 by a particular coroutine when locked. A primitive lock is in one
97 of two states, 'locked' or 'unlocked'.
98
99 It is created in the unlocked state. It has two basic methods,
100 acquire() and release(). When the state is unlocked, acquire()
101 changes the state to locked and returns immediately. When the
102 state is locked, acquire() blocks until a call to release() in
103 another coroutine changes it to unlocked, then the acquire() call
104 resets it to locked and returns. The release() method should only
105 be called in the locked state; it changes the state to unlocked
106 and returns immediately. If an attempt is made to release an
107 unlocked lock, a RuntimeError will be raised.
108
109 When more than one coroutine is blocked in acquire() waiting for
110 the state to turn to unlocked, only one coroutine proceeds when a
111 release() call resets the state to unlocked; first coroutine which
112 is blocked in acquire() is being processed.
113
114 acquire() is a coroutine and should be called with 'yield from'.
115
Serhiy Storchaka14867992014-09-10 23:43:41 +0300116 Locks also support the context management protocol. '(yield from lock)'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700117 should be used as context manager expression.
118
119 Usage:
120
121 lock = Lock()
122 ...
123 yield from lock
124 try:
125 ...
126 finally:
127 lock.release()
128
129 Context manager usage:
130
131 lock = Lock()
132 ...
133 with (yield from lock):
134 ...
135
136 Lock objects can be tested for locking state:
137
138 if not lock.locked():
139 yield from lock
140 else:
141 # lock is acquired
142 ...
143
144 """
145
146 def __init__(self, *, loop=None):
147 self._waiters = collections.deque()
148 self._locked = False
149 if loop is not None:
150 self._loop = loop
151 else:
152 self._loop = events.get_event_loop()
153
154 def __repr__(self):
155 res = super().__repr__()
156 extra = 'locked' if self._locked else 'unlocked'
157 if self._waiters:
158 extra = '{},waiters:{}'.format(extra, len(self._waiters))
159 return '<{} [{}]>'.format(res[1:-1], extra)
160
161 def locked(self):
Victor Stinnerc37dd612013-12-02 14:31:16 +0100162 """Return True if lock is acquired."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700163 return self._locked
164
Victor Stinnerf951d282014-06-29 00:46:45 +0200165 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700166 def acquire(self):
167 """Acquire a lock.
168
169 This method blocks until the lock is unlocked, then sets it to
170 locked and returns True.
171 """
172 if not self._waiters and not self._locked:
173 self._locked = True
174 return True
175
176 fut = futures.Future(loop=self._loop)
177 self._waiters.append(fut)
178 try:
179 yield from fut
180 self._locked = True
181 return True
182 finally:
183 self._waiters.remove(fut)
184
185 def release(self):
186 """Release a lock.
187
188 When the lock is locked, reset it to unlocked, and return.
189 If any other coroutines are blocked waiting for the lock to become
190 unlocked, allow exactly one of them to proceed.
191
192 When invoked on an unlocked lock, a RuntimeError is raised.
193
194 There is no return value.
195 """
196 if self._locked:
197 self._locked = False
198 # Wake up the first waiter who isn't cancelled.
199 for fut in self._waiters:
200 if not fut.done():
201 fut.set_result(True)
202 break
203 else:
204 raise RuntimeError('Lock is not acquired.')
205
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700206
207class Event:
Guido van Rossum994bf432013-12-19 12:47:38 -0800208 """Asynchronous equivalent to threading.Event.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700209
210 Class implementing event objects. An event manages a flag that can be set
211 to true with the set() method and reset to false with the clear() method.
212 The wait() method blocks until the flag is true. The flag is initially
213 false.
214 """
215
216 def __init__(self, *, loop=None):
217 self._waiters = collections.deque()
218 self._value = False
219 if loop is not None:
220 self._loop = loop
221 else:
222 self._loop = events.get_event_loop()
223
224 def __repr__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700225 res = super().__repr__()
Guido van Rossumccea0842013-11-04 13:18:19 -0800226 extra = 'set' if self._value else 'unset'
227 if self._waiters:
228 extra = '{},waiters:{}'.format(extra, len(self._waiters))
229 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700230
231 def is_set(self):
Victor Stinnerc37dd612013-12-02 14:31:16 +0100232 """Return True if and only if the internal flag is true."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700233 return self._value
234
235 def set(self):
236 """Set the internal flag to true. All coroutines waiting for it to
237 become true are awakened. Coroutine that call wait() once the flag is
238 true will not block at all.
239 """
240 if not self._value:
241 self._value = True
242
243 for fut in self._waiters:
244 if not fut.done():
245 fut.set_result(True)
246
247 def clear(self):
248 """Reset the internal flag to false. Subsequently, coroutines calling
249 wait() will block until set() is called to set the internal flag
250 to true again."""
251 self._value = False
252
Victor Stinnerf951d282014-06-29 00:46:45 +0200253 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700254 def wait(self):
255 """Block until the internal flag is true.
256
257 If the internal flag is true on entry, return True
258 immediately. Otherwise, block until another coroutine calls
259 set() to set the flag to true, then return True.
260 """
261 if self._value:
262 return True
263
264 fut = futures.Future(loop=self._loop)
265 self._waiters.append(fut)
266 try:
267 yield from fut
268 return True
269 finally:
270 self._waiters.remove(fut)
271
272
Yury Selivanovd08c3632015-05-13 15:15:56 -0400273class Condition(_ContextManagerMixin):
Guido van Rossum994bf432013-12-19 12:47:38 -0800274 """Asynchronous equivalent to threading.Condition.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700275
276 This class implements condition variable objects. A condition variable
277 allows one or more coroutines to wait until they are notified by another
278 coroutine.
Guido van Rossumccea0842013-11-04 13:18:19 -0800279
280 A new Lock object is created and used as the underlying lock.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700281 """
282
Andrew Svetlovf21fcd02014-07-26 17:54:34 +0300283 def __init__(self, lock=None, *, loop=None):
Guido van Rossumccea0842013-11-04 13:18:19 -0800284 if loop is not None:
285 self._loop = loop
286 else:
287 self._loop = events.get_event_loop()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700288
Andrew Svetlovf21fcd02014-07-26 17:54:34 +0300289 if lock is None:
290 lock = Lock(loop=self._loop)
291 elif lock._loop is not self._loop:
292 raise ValueError("loop argument must agree with lock")
293
Guido van Rossumccea0842013-11-04 13:18:19 -0800294 self._lock = lock
295 # Export the lock's locked(), acquire() and release() methods.
296 self.locked = lock.locked
297 self.acquire = lock.acquire
298 self.release = lock.release
299
300 self._waiters = collections.deque()
301
302 def __repr__(self):
303 res = super().__repr__()
304 extra = 'locked' if self.locked() else 'unlocked'
305 if self._waiters:
306 extra = '{},waiters:{}'.format(extra, len(self._waiters))
307 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700308
Victor Stinnerf951d282014-06-29 00:46:45 +0200309 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700310 def wait(self):
311 """Wait until notified.
312
313 If the calling coroutine has not acquired the lock when this
314 method is called, a RuntimeError is raised.
315
316 This method releases the underlying lock, and then blocks
317 until it is awakened by a notify() or notify_all() call for
318 the same condition variable in another coroutine. Once
319 awakened, it re-acquires the lock and returns True.
320 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800321 if not self.locked():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700322 raise RuntimeError('cannot wait on un-acquired lock')
323
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700324 self.release()
325 try:
326 fut = futures.Future(loop=self._loop)
Guido van Rossumccea0842013-11-04 13:18:19 -0800327 self._waiters.append(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700328 try:
329 yield from fut
330 return True
331 finally:
Guido van Rossumccea0842013-11-04 13:18:19 -0800332 self._waiters.remove(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700333
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700334 finally:
Guido van Rossum2407f3b2014-01-10 13:25:38 -0800335 yield from self.acquire()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700336
Victor Stinnerf951d282014-06-29 00:46:45 +0200337 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700338 def wait_for(self, predicate):
339 """Wait until a predicate becomes true.
340
341 The predicate should be a callable which result will be
342 interpreted as a boolean value. The final predicate value is
343 the return value.
344 """
345 result = predicate()
346 while not result:
347 yield from self.wait()
348 result = predicate()
349 return result
350
351 def notify(self, n=1):
352 """By default, wake up one coroutine waiting on this condition, if any.
353 If the calling coroutine has not acquired the lock when this method
354 is called, a RuntimeError is raised.
355
356 This method wakes up at most n of the coroutines waiting for the
357 condition variable; it is a no-op if no coroutines are waiting.
358
359 Note: an awakened coroutine does not actually return from its
360 wait() call until it can reacquire the lock. Since notify() does
361 not release the lock, its caller should.
362 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800363 if not self.locked():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700364 raise RuntimeError('cannot notify on un-acquired lock')
365
366 idx = 0
Guido van Rossumccea0842013-11-04 13:18:19 -0800367 for fut in self._waiters:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700368 if idx >= n:
369 break
370
371 if not fut.done():
372 idx += 1
373 fut.set_result(False)
374
375 def notify_all(self):
376 """Wake up all threads waiting on this condition. This method acts
377 like notify(), but wakes up all waiting threads instead of one. If the
378 calling thread has not acquired the lock when this method is called,
379 a RuntimeError is raised.
380 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800381 self.notify(len(self._waiters))
382
Guido van Rossumccea0842013-11-04 13:18:19 -0800383
Yury Selivanovd08c3632015-05-13 15:15:56 -0400384class Semaphore(_ContextManagerMixin):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700385 """A Semaphore implementation.
386
387 A semaphore manages an internal counter which is decremented by each
388 acquire() call and incremented by each release() call. The counter
389 can never go below zero; when acquire() finds that it is zero, it blocks,
390 waiting until some other thread calls release().
391
Serhiy Storchaka14867992014-09-10 23:43:41 +0300392 Semaphores also support the context management protocol.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700393
Guido van Rossum085869b2013-11-23 15:09:16 -0800394 The optional argument gives the initial value for the internal
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700395 counter; it defaults to 1. If the value given is less than 0,
396 ValueError is raised.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700397 """
398
Guido van Rossum085869b2013-11-23 15:09:16 -0800399 def __init__(self, value=1, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700400 if value < 0:
Guido van Rossum9c55a582013-11-21 11:07:45 -0800401 raise ValueError("Semaphore initial value must be >= 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700402 self._value = value
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700403 self._waiters = collections.deque()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700404 if loop is not None:
405 self._loop = loop
406 else:
407 self._loop = events.get_event_loop()
408
409 def __repr__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700410 res = super().__repr__()
Guido van Rossumab3c8892014-01-25 16:51:57 -0800411 extra = 'locked' if self.locked() else 'unlocked,value:{}'.format(
Guido van Rossumccea0842013-11-04 13:18:19 -0800412 self._value)
413 if self._waiters:
414 extra = '{},waiters:{}'.format(extra, len(self._waiters))
415 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700416
417 def locked(self):
418 """Returns True if semaphore can not be acquired immediately."""
Guido van Rossumab3c8892014-01-25 16:51:57 -0800419 return self._value == 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700420
Victor Stinnerf951d282014-06-29 00:46:45 +0200421 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700422 def acquire(self):
423 """Acquire a semaphore.
424
425 If the internal counter is larger than zero on entry,
426 decrement it by one and return True immediately. If it is
427 zero on entry, block, waiting until some other coroutine has
428 called release() to make it larger than 0, and then return
429 True.
430 """
431 if not self._waiters and self._value > 0:
432 self._value -= 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700433 return True
434
435 fut = futures.Future(loop=self._loop)
436 self._waiters.append(fut)
437 try:
438 yield from fut
439 self._value -= 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700440 return True
441 finally:
442 self._waiters.remove(fut)
443
444 def release(self):
445 """Release a semaphore, incrementing the internal counter by one.
446 When it was zero on entry and another coroutine is waiting for it to
447 become larger than zero again, wake up that coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700448 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700449 self._value += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700450 for waiter in self._waiters:
451 if not waiter.done():
452 waiter.set_result(True)
453 break
454
Guido van Rossum085869b2013-11-23 15:09:16 -0800455
456class BoundedSemaphore(Semaphore):
457 """A bounded semaphore implementation.
458
459 This raises ValueError in release() if it would increase the value
460 above the initial value.
461 """
462
463 def __init__(self, value=1, *, loop=None):
464 self._bound_value = value
465 super().__init__(value, loop=loop)
466
467 def release(self):
468 if self._value >= self._bound_value:
469 raise ValueError('BoundedSemaphore released too many times')
470 super().release()