blob: cc6f2bf76f16aac7c7827aa5b568ecb96b3b06a7 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Synchronization primitives."""
2
Guido van Rossuma58d1c32013-11-24 22:32:09 -08003__all__ = ['Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004
5import collections
Yury Selivanovd08c3632015-05-13 15:15:56 -04006import sys
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007
Victor Stinner71080fc2015-07-25 02:23:21 +02008from . import compat
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009from . import events
10from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020011from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13
Guido van Rossumab3c8892014-01-25 16:51:57 -080014class _ContextManager:
15 """Context manager.
16
17 This enables the following idiom for acquiring and releasing a
18 lock around a block:
19
20 with (yield from lock):
21 <block>
22
23 while failing loudly when accidentally using:
24
25 with lock:
26 <block>
27 """
28
29 def __init__(self, lock):
30 self._lock = lock
31
32 def __enter__(self):
33 # We have no use for the "as ..." clause in the with
34 # statement for locks.
35 return None
36
37 def __exit__(self, *args):
38 try:
39 self._lock.release()
40 finally:
41 self._lock = None # Crudely prevent reuse.
42
43
Yury Selivanovd08c3632015-05-13 15:15:56 -040044class _ContextManagerMixin:
45 def __enter__(self):
46 raise RuntimeError(
47 '"yield from" should be used as context manager expression')
48
49 def __exit__(self, *args):
50 # This must exist because __enter__ exists, even though that
51 # always raises; that's how the with-statement works.
52 pass
53
54 @coroutine
55 def __iter__(self):
56 # This is not a coroutine. It is meant to enable the idiom:
57 #
58 # with (yield from lock):
59 # <block>
60 #
61 # as an alternative to:
62 #
63 # yield from lock.acquire()
64 # try:
65 # <block>
66 # finally:
67 # lock.release()
68 yield from self.acquire()
69 return _ContextManager(self)
70
Victor Stinner71080fc2015-07-25 02:23:21 +020071 if compat.PY35:
Yury Selivanovd08c3632015-05-13 15:15:56 -040072
73 def __await__(self):
74 # To make "with await lock" work.
75 yield from self.acquire()
76 return _ContextManager(self)
77
78 @coroutine
79 def __aenter__(self):
80 yield from self.acquire()
81 # We have no use for the "as ..." clause in the with
82 # statement for locks.
83 return None
84
85 @coroutine
86 def __aexit__(self, exc_type, exc, tb):
87 self.release()
88
89
90class Lock(_ContextManagerMixin):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070091 """Primitive lock objects.
92
93 A primitive lock is a synchronization primitive that is not owned
94 by a particular coroutine when locked. A primitive lock is in one
95 of two states, 'locked' or 'unlocked'.
96
97 It is created in the unlocked state. It has two basic methods,
98 acquire() and release(). When the state is unlocked, acquire()
99 changes the state to locked and returns immediately. When the
100 state is locked, acquire() blocks until a call to release() in
101 another coroutine changes it to unlocked, then the acquire() call
102 resets it to locked and returns. The release() method should only
103 be called in the locked state; it changes the state to unlocked
104 and returns immediately. If an attempt is made to release an
105 unlocked lock, a RuntimeError will be raised.
106
107 When more than one coroutine is blocked in acquire() waiting for
108 the state to turn to unlocked, only one coroutine proceeds when a
109 release() call resets the state to unlocked; first coroutine which
110 is blocked in acquire() is being processed.
111
112 acquire() is a coroutine and should be called with 'yield from'.
113
Serhiy Storchaka14867992014-09-10 23:43:41 +0300114 Locks also support the context management protocol. '(yield from lock)'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700115 should be used as context manager expression.
116
117 Usage:
118
119 lock = Lock()
120 ...
121 yield from lock
122 try:
123 ...
124 finally:
125 lock.release()
126
127 Context manager usage:
128
129 lock = Lock()
130 ...
131 with (yield from lock):
132 ...
133
134 Lock objects can be tested for locking state:
135
136 if not lock.locked():
137 yield from lock
138 else:
139 # lock is acquired
140 ...
141
142 """
143
144 def __init__(self, *, loop=None):
145 self._waiters = collections.deque()
146 self._locked = False
147 if loop is not None:
148 self._loop = loop
149 else:
150 self._loop = events.get_event_loop()
151
152 def __repr__(self):
153 res = super().__repr__()
154 extra = 'locked' if self._locked else 'unlocked'
155 if self._waiters:
156 extra = '{},waiters:{}'.format(extra, len(self._waiters))
157 return '<{} [{}]>'.format(res[1:-1], extra)
158
159 def locked(self):
Victor Stinnerc37dd612013-12-02 14:31:16 +0100160 """Return True if lock is acquired."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700161 return self._locked
162
Victor Stinnerf951d282014-06-29 00:46:45 +0200163 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700164 def acquire(self):
165 """Acquire a lock.
166
167 This method blocks until the lock is unlocked, then sets it to
168 locked and returns True.
169 """
170 if not self._waiters and not self._locked:
171 self._locked = True
172 return True
173
174 fut = futures.Future(loop=self._loop)
175 self._waiters.append(fut)
176 try:
177 yield from fut
178 self._locked = True
179 return True
180 finally:
181 self._waiters.remove(fut)
182
183 def release(self):
184 """Release a lock.
185
186 When the lock is locked, reset it to unlocked, and return.
187 If any other coroutines are blocked waiting for the lock to become
188 unlocked, allow exactly one of them to proceed.
189
190 When invoked on an unlocked lock, a RuntimeError is raised.
191
192 There is no return value.
193 """
194 if self._locked:
195 self._locked = False
196 # Wake up the first waiter who isn't cancelled.
197 for fut in self._waiters:
198 if not fut.done():
199 fut.set_result(True)
200 break
201 else:
202 raise RuntimeError('Lock is not acquired.')
203
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700204
205class Event:
Guido van Rossum994bf432013-12-19 12:47:38 -0800206 """Asynchronous equivalent to threading.Event.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700207
208 Class implementing event objects. An event manages a flag that can be set
209 to true with the set() method and reset to false with the clear() method.
210 The wait() method blocks until the flag is true. The flag is initially
211 false.
212 """
213
214 def __init__(self, *, loop=None):
215 self._waiters = collections.deque()
216 self._value = False
217 if loop is not None:
218 self._loop = loop
219 else:
220 self._loop = events.get_event_loop()
221
222 def __repr__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700223 res = super().__repr__()
Guido van Rossumccea0842013-11-04 13:18:19 -0800224 extra = 'set' if self._value else 'unset'
225 if self._waiters:
226 extra = '{},waiters:{}'.format(extra, len(self._waiters))
227 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700228
229 def is_set(self):
Victor Stinnerc37dd612013-12-02 14:31:16 +0100230 """Return True if and only if the internal flag is true."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700231 return self._value
232
233 def set(self):
234 """Set the internal flag to true. All coroutines waiting for it to
235 become true are awakened. Coroutine that call wait() once the flag is
236 true will not block at all.
237 """
238 if not self._value:
239 self._value = True
240
241 for fut in self._waiters:
242 if not fut.done():
243 fut.set_result(True)
244
245 def clear(self):
246 """Reset the internal flag to false. Subsequently, coroutines calling
247 wait() will block until set() is called to set the internal flag
248 to true again."""
249 self._value = False
250
Victor Stinnerf951d282014-06-29 00:46:45 +0200251 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700252 def wait(self):
253 """Block until the internal flag is true.
254
255 If the internal flag is true on entry, return True
256 immediately. Otherwise, block until another coroutine calls
257 set() to set the flag to true, then return True.
258 """
259 if self._value:
260 return True
261
262 fut = futures.Future(loop=self._loop)
263 self._waiters.append(fut)
264 try:
265 yield from fut
266 return True
267 finally:
268 self._waiters.remove(fut)
269
270
Yury Selivanovd08c3632015-05-13 15:15:56 -0400271class Condition(_ContextManagerMixin):
Guido van Rossum994bf432013-12-19 12:47:38 -0800272 """Asynchronous equivalent to threading.Condition.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700273
274 This class implements condition variable objects. A condition variable
275 allows one or more coroutines to wait until they are notified by another
276 coroutine.
Guido van Rossumccea0842013-11-04 13:18:19 -0800277
278 A new Lock object is created and used as the underlying lock.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700279 """
280
Andrew Svetlovf21fcd02014-07-26 17:54:34 +0300281 def __init__(self, lock=None, *, loop=None):
Guido van Rossumccea0842013-11-04 13:18:19 -0800282 if loop is not None:
283 self._loop = loop
284 else:
285 self._loop = events.get_event_loop()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700286
Andrew Svetlovf21fcd02014-07-26 17:54:34 +0300287 if lock is None:
288 lock = Lock(loop=self._loop)
289 elif lock._loop is not self._loop:
290 raise ValueError("loop argument must agree with lock")
291
Guido van Rossumccea0842013-11-04 13:18:19 -0800292 self._lock = lock
293 # Export the lock's locked(), acquire() and release() methods.
294 self.locked = lock.locked
295 self.acquire = lock.acquire
296 self.release = lock.release
297
298 self._waiters = collections.deque()
299
300 def __repr__(self):
301 res = super().__repr__()
302 extra = 'locked' if self.locked() else 'unlocked'
303 if self._waiters:
304 extra = '{},waiters:{}'.format(extra, len(self._waiters))
305 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700306
Victor Stinnerf951d282014-06-29 00:46:45 +0200307 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700308 def wait(self):
309 """Wait until notified.
310
311 If the calling coroutine has not acquired the lock when this
312 method is called, a RuntimeError is raised.
313
314 This method releases the underlying lock, and then blocks
315 until it is awakened by a notify() or notify_all() call for
316 the same condition variable in another coroutine. Once
317 awakened, it re-acquires the lock and returns True.
318 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800319 if not self.locked():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700320 raise RuntimeError('cannot wait on un-acquired lock')
321
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700322 self.release()
323 try:
324 fut = futures.Future(loop=self._loop)
Guido van Rossumccea0842013-11-04 13:18:19 -0800325 self._waiters.append(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700326 try:
327 yield from fut
328 return True
329 finally:
Guido van Rossumccea0842013-11-04 13:18:19 -0800330 self._waiters.remove(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700331
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700332 finally:
Guido van Rossum2407f3b2014-01-10 13:25:38 -0800333 yield from self.acquire()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700334
Victor Stinnerf951d282014-06-29 00:46:45 +0200335 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700336 def wait_for(self, predicate):
337 """Wait until a predicate becomes true.
338
339 The predicate should be a callable which result will be
340 interpreted as a boolean value. The final predicate value is
341 the return value.
342 """
343 result = predicate()
344 while not result:
345 yield from self.wait()
346 result = predicate()
347 return result
348
349 def notify(self, n=1):
350 """By default, wake up one coroutine waiting on this condition, if any.
351 If the calling coroutine has not acquired the lock when this method
352 is called, a RuntimeError is raised.
353
354 This method wakes up at most n of the coroutines waiting for the
355 condition variable; it is a no-op if no coroutines are waiting.
356
357 Note: an awakened coroutine does not actually return from its
358 wait() call until it can reacquire the lock. Since notify() does
359 not release the lock, its caller should.
360 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800361 if not self.locked():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700362 raise RuntimeError('cannot notify on un-acquired lock')
363
364 idx = 0
Guido van Rossumccea0842013-11-04 13:18:19 -0800365 for fut in self._waiters:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700366 if idx >= n:
367 break
368
369 if not fut.done():
370 idx += 1
371 fut.set_result(False)
372
373 def notify_all(self):
374 """Wake up all threads waiting on this condition. This method acts
375 like notify(), but wakes up all waiting threads instead of one. If the
376 calling thread has not acquired the lock when this method is called,
377 a RuntimeError is raised.
378 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800379 self.notify(len(self._waiters))
380
Guido van Rossumccea0842013-11-04 13:18:19 -0800381
Yury Selivanovd08c3632015-05-13 15:15:56 -0400382class Semaphore(_ContextManagerMixin):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700383 """A Semaphore implementation.
384
385 A semaphore manages an internal counter which is decremented by each
386 acquire() call and incremented by each release() call. The counter
387 can never go below zero; when acquire() finds that it is zero, it blocks,
388 waiting until some other thread calls release().
389
Serhiy Storchaka14867992014-09-10 23:43:41 +0300390 Semaphores also support the context management protocol.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700391
Guido van Rossum085869b2013-11-23 15:09:16 -0800392 The optional argument gives the initial value for the internal
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700393 counter; it defaults to 1. If the value given is less than 0,
394 ValueError is raised.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700395 """
396
Guido van Rossum085869b2013-11-23 15:09:16 -0800397 def __init__(self, value=1, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700398 if value < 0:
Guido van Rossum9c55a582013-11-21 11:07:45 -0800399 raise ValueError("Semaphore initial value must be >= 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700400 self._value = value
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700401 self._waiters = collections.deque()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700402 if loop is not None:
403 self._loop = loop
404 else:
405 self._loop = events.get_event_loop()
406
407 def __repr__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700408 res = super().__repr__()
Guido van Rossumab3c8892014-01-25 16:51:57 -0800409 extra = 'locked' if self.locked() else 'unlocked,value:{}'.format(
Guido van Rossumccea0842013-11-04 13:18:19 -0800410 self._value)
411 if self._waiters:
412 extra = '{},waiters:{}'.format(extra, len(self._waiters))
413 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700414
415 def locked(self):
416 """Returns True if semaphore can not be acquired immediately."""
Guido van Rossumab3c8892014-01-25 16:51:57 -0800417 return self._value == 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700418
Victor Stinnerf951d282014-06-29 00:46:45 +0200419 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700420 def acquire(self):
421 """Acquire a semaphore.
422
423 If the internal counter is larger than zero on entry,
424 decrement it by one and return True immediately. If it is
425 zero on entry, block, waiting until some other coroutine has
426 called release() to make it larger than 0, and then return
427 True.
428 """
429 if not self._waiters and self._value > 0:
430 self._value -= 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700431 return True
432
433 fut = futures.Future(loop=self._loop)
434 self._waiters.append(fut)
435 try:
436 yield from fut
437 self._value -= 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700438 return True
439 finally:
440 self._waiters.remove(fut)
441
442 def release(self):
443 """Release a semaphore, incrementing the internal counter by one.
444 When it was zero on entry and another coroutine is waiting for it to
445 become larger than zero again, wake up that coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700446 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700447 self._value += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700448 for waiter in self._waiters:
449 if not waiter.done():
450 waiter.set_result(True)
451 break
452
Guido van Rossum085869b2013-11-23 15:09:16 -0800453
454class BoundedSemaphore(Semaphore):
455 """A bounded semaphore implementation.
456
457 This raises ValueError in release() if it would increase the value
458 above the initial value.
459 """
460
461 def __init__(self, value=1, *, loop=None):
462 self._bound_value = value
463 super().__init__(value, loop=loop)
464
465 def release(self):
466 if self._value >= self._bound_value:
467 raise ValueError('BoundedSemaphore released too many times')
468 super().release()