blob: 34f6bc16ad87b69743d0918bfc5e779d25d2bdc7 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Synchronization primitives."""
2
Guido van Rossuma58d1c32013-11-24 22:32:09 -08003__all__ = ['Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004
5import collections
6
Victor Stinner71080fc2015-07-25 02:23:21 +02007from . import compat
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008from . import events
9from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020010from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
12
Guido van Rossumab3c8892014-01-25 16:51:57 -080013class _ContextManager:
14 """Context manager.
15
16 This enables the following idiom for acquiring and releasing a
17 lock around a block:
18
19 with (yield from lock):
20 <block>
21
22 while failing loudly when accidentally using:
23
24 with lock:
25 <block>
26 """
27
28 def __init__(self, lock):
29 self._lock = lock
30
31 def __enter__(self):
32 # We have no use for the "as ..." clause in the with
33 # statement for locks.
34 return None
35
36 def __exit__(self, *args):
37 try:
38 self._lock.release()
39 finally:
40 self._lock = None # Crudely prevent reuse.
41
42
Yury Selivanovd08c3632015-05-13 15:15:56 -040043class _ContextManagerMixin:
44 def __enter__(self):
45 raise RuntimeError(
46 '"yield from" should be used as context manager expression')
47
48 def __exit__(self, *args):
49 # This must exist because __enter__ exists, even though that
50 # always raises; that's how the with-statement works.
51 pass
52
53 @coroutine
54 def __iter__(self):
55 # This is not a coroutine. It is meant to enable the idiom:
56 #
57 # with (yield from lock):
58 # <block>
59 #
60 # as an alternative to:
61 #
62 # yield from lock.acquire()
63 # try:
64 # <block>
65 # finally:
66 # lock.release()
67 yield from self.acquire()
68 return _ContextManager(self)
69
Victor Stinner71080fc2015-07-25 02:23:21 +020070 if compat.PY35:
Yury Selivanovd08c3632015-05-13 15:15:56 -040071
72 def __await__(self):
73 # To make "with await lock" work.
74 yield from self.acquire()
75 return _ContextManager(self)
76
77 @coroutine
78 def __aenter__(self):
79 yield from self.acquire()
80 # We have no use for the "as ..." clause in the with
81 # statement for locks.
82 return None
83
84 @coroutine
85 def __aexit__(self, exc_type, exc, tb):
86 self.release()
87
88
89class Lock(_ContextManagerMixin):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070090 """Primitive lock objects.
91
92 A primitive lock is a synchronization primitive that is not owned
93 by a particular coroutine when locked. A primitive lock is in one
94 of two states, 'locked' or 'unlocked'.
95
96 It is created in the unlocked state. It has two basic methods,
97 acquire() and release(). When the state is unlocked, acquire()
98 changes the state to locked and returns immediately. When the
99 state is locked, acquire() blocks until a call to release() in
100 another coroutine changes it to unlocked, then the acquire() call
101 resets it to locked and returns. The release() method should only
102 be called in the locked state; it changes the state to unlocked
103 and returns immediately. If an attempt is made to release an
104 unlocked lock, a RuntimeError will be raised.
105
106 When more than one coroutine is blocked in acquire() waiting for
107 the state to turn to unlocked, only one coroutine proceeds when a
108 release() call resets the state to unlocked; first coroutine which
109 is blocked in acquire() is being processed.
110
111 acquire() is a coroutine and should be called with 'yield from'.
112
Serhiy Storchaka14867992014-09-10 23:43:41 +0300113 Locks also support the context management protocol. '(yield from lock)'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700114 should be used as context manager expression.
115
116 Usage:
117
118 lock = Lock()
119 ...
120 yield from lock
121 try:
122 ...
123 finally:
124 lock.release()
125
126 Context manager usage:
127
128 lock = Lock()
129 ...
130 with (yield from lock):
131 ...
132
133 Lock objects can be tested for locking state:
134
135 if not lock.locked():
136 yield from lock
137 else:
138 # lock is acquired
139 ...
140
141 """
142
143 def __init__(self, *, loop=None):
144 self._waiters = collections.deque()
145 self._locked = False
146 if loop is not None:
147 self._loop = loop
148 else:
149 self._loop = events.get_event_loop()
150
151 def __repr__(self):
152 res = super().__repr__()
153 extra = 'locked' if self._locked else 'unlocked'
154 if self._waiters:
155 extra = '{},waiters:{}'.format(extra, len(self._waiters))
156 return '<{} [{}]>'.format(res[1:-1], extra)
157
158 def locked(self):
Victor Stinnerc37dd612013-12-02 14:31:16 +0100159 """Return True if lock is acquired."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700160 return self._locked
161
Victor Stinnerf951d282014-06-29 00:46:45 +0200162 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700163 def acquire(self):
164 """Acquire a lock.
165
166 This method blocks until the lock is unlocked, then sets it to
167 locked and returns True.
168 """
169 if not self._waiters and not self._locked:
170 self._locked = True
171 return True
172
173 fut = futures.Future(loop=self._loop)
174 self._waiters.append(fut)
175 try:
176 yield from fut
177 self._locked = True
178 return True
179 finally:
180 self._waiters.remove(fut)
181
182 def release(self):
183 """Release a lock.
184
185 When the lock is locked, reset it to unlocked, and return.
186 If any other coroutines are blocked waiting for the lock to become
187 unlocked, allow exactly one of them to proceed.
188
189 When invoked on an unlocked lock, a RuntimeError is raised.
190
191 There is no return value.
192 """
193 if self._locked:
194 self._locked = False
195 # Wake up the first waiter who isn't cancelled.
196 for fut in self._waiters:
197 if not fut.done():
198 fut.set_result(True)
199 break
200 else:
201 raise RuntimeError('Lock is not acquired.')
202
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700203
204class Event:
Guido van Rossum994bf432013-12-19 12:47:38 -0800205 """Asynchronous equivalent to threading.Event.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700206
207 Class implementing event objects. An event manages a flag that can be set
208 to true with the set() method and reset to false with the clear() method.
209 The wait() method blocks until the flag is true. The flag is initially
210 false.
211 """
212
213 def __init__(self, *, loop=None):
214 self._waiters = collections.deque()
215 self._value = False
216 if loop is not None:
217 self._loop = loop
218 else:
219 self._loop = events.get_event_loop()
220
221 def __repr__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700222 res = super().__repr__()
Guido van Rossumccea0842013-11-04 13:18:19 -0800223 extra = 'set' if self._value else 'unset'
224 if self._waiters:
225 extra = '{},waiters:{}'.format(extra, len(self._waiters))
226 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700227
228 def is_set(self):
Victor Stinnerc37dd612013-12-02 14:31:16 +0100229 """Return True if and only if the internal flag is true."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700230 return self._value
231
232 def set(self):
233 """Set the internal flag to true. All coroutines waiting for it to
234 become true are awakened. Coroutine that call wait() once the flag is
235 true will not block at all.
236 """
237 if not self._value:
238 self._value = True
239
240 for fut in self._waiters:
241 if not fut.done():
242 fut.set_result(True)
243
244 def clear(self):
245 """Reset the internal flag to false. Subsequently, coroutines calling
246 wait() will block until set() is called to set the internal flag
247 to true again."""
248 self._value = False
249
Victor Stinnerf951d282014-06-29 00:46:45 +0200250 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700251 def wait(self):
252 """Block until the internal flag is true.
253
254 If the internal flag is true on entry, return True
255 immediately. Otherwise, block until another coroutine calls
256 set() to set the flag to true, then return True.
257 """
258 if self._value:
259 return True
260
261 fut = futures.Future(loop=self._loop)
262 self._waiters.append(fut)
263 try:
264 yield from fut
265 return True
266 finally:
267 self._waiters.remove(fut)
268
269
Yury Selivanovd08c3632015-05-13 15:15:56 -0400270class Condition(_ContextManagerMixin):
Guido van Rossum994bf432013-12-19 12:47:38 -0800271 """Asynchronous equivalent to threading.Condition.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700272
273 This class implements condition variable objects. A condition variable
274 allows one or more coroutines to wait until they are notified by another
275 coroutine.
Guido van Rossumccea0842013-11-04 13:18:19 -0800276
277 A new Lock object is created and used as the underlying lock.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700278 """
279
Andrew Svetlovf21fcd02014-07-26 17:54:34 +0300280 def __init__(self, lock=None, *, loop=None):
Guido van Rossumccea0842013-11-04 13:18:19 -0800281 if loop is not None:
282 self._loop = loop
283 else:
284 self._loop = events.get_event_loop()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700285
Andrew Svetlovf21fcd02014-07-26 17:54:34 +0300286 if lock is None:
287 lock = Lock(loop=self._loop)
288 elif lock._loop is not self._loop:
289 raise ValueError("loop argument must agree with lock")
290
Guido van Rossumccea0842013-11-04 13:18:19 -0800291 self._lock = lock
292 # Export the lock's locked(), acquire() and release() methods.
293 self.locked = lock.locked
294 self.acquire = lock.acquire
295 self.release = lock.release
296
297 self._waiters = collections.deque()
298
299 def __repr__(self):
300 res = super().__repr__()
301 extra = 'locked' if self.locked() else 'unlocked'
302 if self._waiters:
303 extra = '{},waiters:{}'.format(extra, len(self._waiters))
304 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700305
Victor Stinnerf951d282014-06-29 00:46:45 +0200306 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700307 def wait(self):
308 """Wait until notified.
309
310 If the calling coroutine has not acquired the lock when this
311 method is called, a RuntimeError is raised.
312
313 This method releases the underlying lock, and then blocks
314 until it is awakened by a notify() or notify_all() call for
315 the same condition variable in another coroutine. Once
316 awakened, it re-acquires the lock and returns True.
317 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800318 if not self.locked():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700319 raise RuntimeError('cannot wait on un-acquired lock')
320
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700321 self.release()
322 try:
323 fut = futures.Future(loop=self._loop)
Guido van Rossumccea0842013-11-04 13:18:19 -0800324 self._waiters.append(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700325 try:
326 yield from fut
327 return True
328 finally:
Guido van Rossumccea0842013-11-04 13:18:19 -0800329 self._waiters.remove(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700330
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700331 finally:
Guido van Rossum2407f3b2014-01-10 13:25:38 -0800332 yield from self.acquire()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700333
Victor Stinnerf951d282014-06-29 00:46:45 +0200334 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700335 def wait_for(self, predicate):
336 """Wait until a predicate becomes true.
337
338 The predicate should be a callable which result will be
339 interpreted as a boolean value. The final predicate value is
340 the return value.
341 """
342 result = predicate()
343 while not result:
344 yield from self.wait()
345 result = predicate()
346 return result
347
348 def notify(self, n=1):
349 """By default, wake up one coroutine waiting on this condition, if any.
350 If the calling coroutine has not acquired the lock when this method
351 is called, a RuntimeError is raised.
352
353 This method wakes up at most n of the coroutines waiting for the
354 condition variable; it is a no-op if no coroutines are waiting.
355
356 Note: an awakened coroutine does not actually return from its
357 wait() call until it can reacquire the lock. Since notify() does
358 not release the lock, its caller should.
359 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800360 if not self.locked():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700361 raise RuntimeError('cannot notify on un-acquired lock')
362
363 idx = 0
Guido van Rossumccea0842013-11-04 13:18:19 -0800364 for fut in self._waiters:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700365 if idx >= n:
366 break
367
368 if not fut.done():
369 idx += 1
370 fut.set_result(False)
371
372 def notify_all(self):
373 """Wake up all threads waiting on this condition. This method acts
374 like notify(), but wakes up all waiting threads instead of one. If the
375 calling thread has not acquired the lock when this method is called,
376 a RuntimeError is raised.
377 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800378 self.notify(len(self._waiters))
379
Guido van Rossumccea0842013-11-04 13:18:19 -0800380
Yury Selivanovd08c3632015-05-13 15:15:56 -0400381class Semaphore(_ContextManagerMixin):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700382 """A Semaphore implementation.
383
384 A semaphore manages an internal counter which is decremented by each
385 acquire() call and incremented by each release() call. The counter
386 can never go below zero; when acquire() finds that it is zero, it blocks,
387 waiting until some other thread calls release().
388
Serhiy Storchaka14867992014-09-10 23:43:41 +0300389 Semaphores also support the context management protocol.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700390
Guido van Rossum085869b2013-11-23 15:09:16 -0800391 The optional argument gives the initial value for the internal
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700392 counter; it defaults to 1. If the value given is less than 0,
393 ValueError is raised.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700394 """
395
Guido van Rossum085869b2013-11-23 15:09:16 -0800396 def __init__(self, value=1, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700397 if value < 0:
Guido van Rossum9c55a582013-11-21 11:07:45 -0800398 raise ValueError("Semaphore initial value must be >= 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700399 self._value = value
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700400 self._waiters = collections.deque()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700401 if loop is not None:
402 self._loop = loop
403 else:
404 self._loop = events.get_event_loop()
405
406 def __repr__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700407 res = super().__repr__()
Guido van Rossumab3c8892014-01-25 16:51:57 -0800408 extra = 'locked' if self.locked() else 'unlocked,value:{}'.format(
Guido van Rossumccea0842013-11-04 13:18:19 -0800409 self._value)
410 if self._waiters:
411 extra = '{},waiters:{}'.format(extra, len(self._waiters))
412 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700413
Guido van Rossumd455a502015-09-29 11:54:45 -0700414 def _wake_up_next(self):
415 while self._waiters:
416 waiter = self._waiters.popleft()
417 if not waiter.done():
418 waiter.set_result(None)
419 return
420
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700421 def locked(self):
422 """Returns True if semaphore can not be acquired immediately."""
Guido van Rossumab3c8892014-01-25 16:51:57 -0800423 return self._value == 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700424
Victor Stinnerf951d282014-06-29 00:46:45 +0200425 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700426 def acquire(self):
427 """Acquire a semaphore.
428
429 If the internal counter is larger than zero on entry,
430 decrement it by one and return True immediately. If it is
431 zero on entry, block, waiting until some other coroutine has
432 called release() to make it larger than 0, and then return
433 True.
434 """
Guido van Rossumd455a502015-09-29 11:54:45 -0700435 while self._value <= 0:
436 fut = futures.Future(loop=self._loop)
437 self._waiters.append(fut)
438 try:
439 yield from fut
440 except:
441 # See the similar code in Queue.get.
442 fut.cancel()
443 if self._value > 0 and not fut.cancelled():
444 self._wake_up_next()
445 raise
446 self._value -= 1
447 return True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700448
449 def release(self):
450 """Release a semaphore, incrementing the internal counter by one.
451 When it was zero on entry and another coroutine is waiting for it to
452 become larger than zero again, wake up that coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700453 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700454 self._value += 1
Guido van Rossumd455a502015-09-29 11:54:45 -0700455 self._wake_up_next()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700456
Guido van Rossum085869b2013-11-23 15:09:16 -0800457
458class BoundedSemaphore(Semaphore):
459 """A bounded semaphore implementation.
460
461 This raises ValueError in release() if it would increase the value
462 above the initial value.
463 """
464
465 def __init__(self, value=1, *, loop=None):
466 self._bound_value = value
467 super().__init__(value, loop=loop)
468
469 def release(self):
470 if self._value >= self._bound_value:
471 raise ValueError('BoundedSemaphore released too many times')
472 super().release()