blob: 41a68c6c8e1d973ad389e0603f3b0715a038f387 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Synchronization primitives."""
2
Guido van Rossuma58d1c32013-11-24 22:32:09 -08003__all__ = ['Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004
5import collections
6
7from . import events
8from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +02009from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070010
11
Guido van Rossumab3c8892014-01-25 16:51:57 -080012class _ContextManager:
13 """Context manager.
14
15 This enables the following idiom for acquiring and releasing a
16 lock around a block:
17
18 with (yield from lock):
19 <block>
20
21 while failing loudly when accidentally using:
22
23 with lock:
24 <block>
25 """
26
27 def __init__(self, lock):
28 self._lock = lock
29
30 def __enter__(self):
31 # We have no use for the "as ..." clause in the with
32 # statement for locks.
33 return None
34
35 def __exit__(self, *args):
36 try:
37 self._lock.release()
38 finally:
39 self._lock = None # Crudely prevent reuse.
40
41
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070042class Lock:
43 """Primitive lock objects.
44
45 A primitive lock is a synchronization primitive that is not owned
46 by a particular coroutine when locked. A primitive lock is in one
47 of two states, 'locked' or 'unlocked'.
48
49 It is created in the unlocked state. It has two basic methods,
50 acquire() and release(). When the state is unlocked, acquire()
51 changes the state to locked and returns immediately. When the
52 state is locked, acquire() blocks until a call to release() in
53 another coroutine changes it to unlocked, then the acquire() call
54 resets it to locked and returns. The release() method should only
55 be called in the locked state; it changes the state to unlocked
56 and returns immediately. If an attempt is made to release an
57 unlocked lock, a RuntimeError will be raised.
58
59 When more than one coroutine is blocked in acquire() waiting for
60 the state to turn to unlocked, only one coroutine proceeds when a
61 release() call resets the state to unlocked; first coroutine which
62 is blocked in acquire() is being processed.
63
64 acquire() is a coroutine and should be called with 'yield from'.
65
Serhiy Storchaka14867992014-09-10 23:43:41 +030066 Locks also support the context management protocol. '(yield from lock)'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070067 should be used as context manager expression.
68
69 Usage:
70
71 lock = Lock()
72 ...
73 yield from lock
74 try:
75 ...
76 finally:
77 lock.release()
78
79 Context manager usage:
80
81 lock = Lock()
82 ...
83 with (yield from lock):
84 ...
85
86 Lock objects can be tested for locking state:
87
88 if not lock.locked():
89 yield from lock
90 else:
91 # lock is acquired
92 ...
93
94 """
95
96 def __init__(self, *, loop=None):
97 self._waiters = collections.deque()
98 self._locked = False
99 if loop is not None:
100 self._loop = loop
101 else:
102 self._loop = events.get_event_loop()
103
104 def __repr__(self):
105 res = super().__repr__()
106 extra = 'locked' if self._locked else 'unlocked'
107 if self._waiters:
108 extra = '{},waiters:{}'.format(extra, len(self._waiters))
109 return '<{} [{}]>'.format(res[1:-1], extra)
110
111 def locked(self):
Victor Stinnerc37dd612013-12-02 14:31:16 +0100112 """Return True if lock is acquired."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700113 return self._locked
114
Victor Stinnerf951d282014-06-29 00:46:45 +0200115 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700116 def acquire(self):
117 """Acquire a lock.
118
119 This method blocks until the lock is unlocked, then sets it to
120 locked and returns True.
121 """
122 if not self._waiters and not self._locked:
123 self._locked = True
124 return True
125
126 fut = futures.Future(loop=self._loop)
127 self._waiters.append(fut)
128 try:
129 yield from fut
130 self._locked = True
131 return True
132 finally:
133 self._waiters.remove(fut)
134
135 def release(self):
136 """Release a lock.
137
138 When the lock is locked, reset it to unlocked, and return.
139 If any other coroutines are blocked waiting for the lock to become
140 unlocked, allow exactly one of them to proceed.
141
142 When invoked on an unlocked lock, a RuntimeError is raised.
143
144 There is no return value.
145 """
146 if self._locked:
147 self._locked = False
148 # Wake up the first waiter who isn't cancelled.
149 for fut in self._waiters:
150 if not fut.done():
151 fut.set_result(True)
152 break
153 else:
154 raise RuntimeError('Lock is not acquired.')
155
156 def __enter__(self):
Guido van Rossumab3c8892014-01-25 16:51:57 -0800157 raise RuntimeError(
158 '"yield from" should be used as context manager expression')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700159
160 def __exit__(self, *args):
Guido van Rossumab3c8892014-01-25 16:51:57 -0800161 # This must exist because __enter__ exists, even though that
162 # always raises; that's how the with-statement works.
163 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700164
Victor Stinnerd6dc7bd2015-03-18 11:37:42 +0100165 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700166 def __iter__(self):
Guido van Rossumab3c8892014-01-25 16:51:57 -0800167 # This is not a coroutine. It is meant to enable the idiom:
168 #
169 # with (yield from lock):
170 # <block>
171 #
172 # as an alternative to:
173 #
174 # yield from lock.acquire()
175 # try:
176 # <block>
177 # finally:
178 # lock.release()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700179 yield from self.acquire()
Guido van Rossumab3c8892014-01-25 16:51:57 -0800180 return _ContextManager(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700181
182
183class Event:
Guido van Rossum994bf432013-12-19 12:47:38 -0800184 """Asynchronous equivalent to threading.Event.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700185
186 Class implementing event objects. An event manages a flag that can be set
187 to true with the set() method and reset to false with the clear() method.
188 The wait() method blocks until the flag is true. The flag is initially
189 false.
190 """
191
192 def __init__(self, *, loop=None):
193 self._waiters = collections.deque()
194 self._value = False
195 if loop is not None:
196 self._loop = loop
197 else:
198 self._loop = events.get_event_loop()
199
200 def __repr__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700201 res = super().__repr__()
Guido van Rossumccea0842013-11-04 13:18:19 -0800202 extra = 'set' if self._value else 'unset'
203 if self._waiters:
204 extra = '{},waiters:{}'.format(extra, len(self._waiters))
205 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700206
207 def is_set(self):
Victor Stinnerc37dd612013-12-02 14:31:16 +0100208 """Return True if and only if the internal flag is true."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700209 return self._value
210
211 def set(self):
212 """Set the internal flag to true. All coroutines waiting for it to
213 become true are awakened. Coroutine that call wait() once the flag is
214 true will not block at all.
215 """
216 if not self._value:
217 self._value = True
218
219 for fut in self._waiters:
220 if not fut.done():
221 fut.set_result(True)
222
223 def clear(self):
224 """Reset the internal flag to false. Subsequently, coroutines calling
225 wait() will block until set() is called to set the internal flag
226 to true again."""
227 self._value = False
228
Victor Stinnerf951d282014-06-29 00:46:45 +0200229 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700230 def wait(self):
231 """Block until the internal flag is true.
232
233 If the internal flag is true on entry, return True
234 immediately. Otherwise, block until another coroutine calls
235 set() to set the flag to true, then return True.
236 """
237 if self._value:
238 return True
239
240 fut = futures.Future(loop=self._loop)
241 self._waiters.append(fut)
242 try:
243 yield from fut
244 return True
245 finally:
246 self._waiters.remove(fut)
247
248
Guido van Rossumccea0842013-11-04 13:18:19 -0800249class Condition:
Guido van Rossum994bf432013-12-19 12:47:38 -0800250 """Asynchronous equivalent to threading.Condition.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700251
252 This class implements condition variable objects. A condition variable
253 allows one or more coroutines to wait until they are notified by another
254 coroutine.
Guido van Rossumccea0842013-11-04 13:18:19 -0800255
256 A new Lock object is created and used as the underlying lock.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700257 """
258
Andrew Svetlovf21fcd02014-07-26 17:54:34 +0300259 def __init__(self, lock=None, *, loop=None):
Guido van Rossumccea0842013-11-04 13:18:19 -0800260 if loop is not None:
261 self._loop = loop
262 else:
263 self._loop = events.get_event_loop()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700264
Andrew Svetlovf21fcd02014-07-26 17:54:34 +0300265 if lock is None:
266 lock = Lock(loop=self._loop)
267 elif lock._loop is not self._loop:
268 raise ValueError("loop argument must agree with lock")
269
Guido van Rossumccea0842013-11-04 13:18:19 -0800270 self._lock = lock
271 # Export the lock's locked(), acquire() and release() methods.
272 self.locked = lock.locked
273 self.acquire = lock.acquire
274 self.release = lock.release
275
276 self._waiters = collections.deque()
277
278 def __repr__(self):
279 res = super().__repr__()
280 extra = 'locked' if self.locked() else 'unlocked'
281 if self._waiters:
282 extra = '{},waiters:{}'.format(extra, len(self._waiters))
283 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700284
Victor Stinnerf951d282014-06-29 00:46:45 +0200285 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700286 def wait(self):
287 """Wait until notified.
288
289 If the calling coroutine has not acquired the lock when this
290 method is called, a RuntimeError is raised.
291
292 This method releases the underlying lock, and then blocks
293 until it is awakened by a notify() or notify_all() call for
294 the same condition variable in another coroutine. Once
295 awakened, it re-acquires the lock and returns True.
296 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800297 if not self.locked():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700298 raise RuntimeError('cannot wait on un-acquired lock')
299
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700300 self.release()
301 try:
302 fut = futures.Future(loop=self._loop)
Guido van Rossumccea0842013-11-04 13:18:19 -0800303 self._waiters.append(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700304 try:
305 yield from fut
306 return True
307 finally:
Guido van Rossumccea0842013-11-04 13:18:19 -0800308 self._waiters.remove(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700309
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700310 finally:
Guido van Rossum2407f3b2014-01-10 13:25:38 -0800311 yield from self.acquire()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700312
Victor Stinnerf951d282014-06-29 00:46:45 +0200313 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700314 def wait_for(self, predicate):
315 """Wait until a predicate becomes true.
316
317 The predicate should be a callable which result will be
318 interpreted as a boolean value. The final predicate value is
319 the return value.
320 """
321 result = predicate()
322 while not result:
323 yield from self.wait()
324 result = predicate()
325 return result
326
327 def notify(self, n=1):
328 """By default, wake up one coroutine waiting on this condition, if any.
329 If the calling coroutine has not acquired the lock when this method
330 is called, a RuntimeError is raised.
331
332 This method wakes up at most n of the coroutines waiting for the
333 condition variable; it is a no-op if no coroutines are waiting.
334
335 Note: an awakened coroutine does not actually return from its
336 wait() call until it can reacquire the lock. Since notify() does
337 not release the lock, its caller should.
338 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800339 if not self.locked():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700340 raise RuntimeError('cannot notify on un-acquired lock')
341
342 idx = 0
Guido van Rossumccea0842013-11-04 13:18:19 -0800343 for fut in self._waiters:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700344 if idx >= n:
345 break
346
347 if not fut.done():
348 idx += 1
349 fut.set_result(False)
350
351 def notify_all(self):
352 """Wake up all threads waiting on this condition. This method acts
353 like notify(), but wakes up all waiting threads instead of one. If the
354 calling thread has not acquired the lock when this method is called,
355 a RuntimeError is raised.
356 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800357 self.notify(len(self._waiters))
358
359 def __enter__(self):
Guido van Rossumab3c8892014-01-25 16:51:57 -0800360 raise RuntimeError(
361 '"yield from" should be used as context manager expression')
Guido van Rossumccea0842013-11-04 13:18:19 -0800362
363 def __exit__(self, *args):
Guido van Rossumab3c8892014-01-25 16:51:57 -0800364 pass
Guido van Rossumccea0842013-11-04 13:18:19 -0800365
Victor Stinnerd6dc7bd2015-03-18 11:37:42 +0100366 @coroutine
Guido van Rossumccea0842013-11-04 13:18:19 -0800367 def __iter__(self):
Guido van Rossumab3c8892014-01-25 16:51:57 -0800368 # See comment in Lock.__iter__().
Guido van Rossumccea0842013-11-04 13:18:19 -0800369 yield from self.acquire()
Guido van Rossumab3c8892014-01-25 16:51:57 -0800370 return _ContextManager(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700371
372
373class Semaphore:
374 """A Semaphore implementation.
375
376 A semaphore manages an internal counter which is decremented by each
377 acquire() call and incremented by each release() call. The counter
378 can never go below zero; when acquire() finds that it is zero, it blocks,
379 waiting until some other thread calls release().
380
Serhiy Storchaka14867992014-09-10 23:43:41 +0300381 Semaphores also support the context management protocol.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700382
Guido van Rossum085869b2013-11-23 15:09:16 -0800383 The optional argument gives the initial value for the internal
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700384 counter; it defaults to 1. If the value given is less than 0,
385 ValueError is raised.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700386 """
387
Guido van Rossum085869b2013-11-23 15:09:16 -0800388 def __init__(self, value=1, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700389 if value < 0:
Guido van Rossum9c55a582013-11-21 11:07:45 -0800390 raise ValueError("Semaphore initial value must be >= 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700391 self._value = value
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700392 self._waiters = collections.deque()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700393 if loop is not None:
394 self._loop = loop
395 else:
396 self._loop = events.get_event_loop()
397
398 def __repr__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700399 res = super().__repr__()
Guido van Rossumab3c8892014-01-25 16:51:57 -0800400 extra = 'locked' if self.locked() else 'unlocked,value:{}'.format(
Guido van Rossumccea0842013-11-04 13:18:19 -0800401 self._value)
402 if self._waiters:
403 extra = '{},waiters:{}'.format(extra, len(self._waiters))
404 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700405
406 def locked(self):
407 """Returns True if semaphore can not be acquired immediately."""
Guido van Rossumab3c8892014-01-25 16:51:57 -0800408 return self._value == 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700409
Victor Stinnerf951d282014-06-29 00:46:45 +0200410 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700411 def acquire(self):
412 """Acquire a semaphore.
413
414 If the internal counter is larger than zero on entry,
415 decrement it by one and return True immediately. If it is
416 zero on entry, block, waiting until some other coroutine has
417 called release() to make it larger than 0, and then return
418 True.
419 """
420 if not self._waiters and self._value > 0:
421 self._value -= 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700422 return True
423
424 fut = futures.Future(loop=self._loop)
425 self._waiters.append(fut)
426 try:
427 yield from fut
428 self._value -= 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700429 return True
430 finally:
431 self._waiters.remove(fut)
432
433 def release(self):
434 """Release a semaphore, incrementing the internal counter by one.
435 When it was zero on entry and another coroutine is waiting for it to
436 become larger than zero again, wake up that coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700437 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700438 self._value += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700439 for waiter in self._waiters:
440 if not waiter.done():
441 waiter.set_result(True)
442 break
443
444 def __enter__(self):
Guido van Rossumab3c8892014-01-25 16:51:57 -0800445 raise RuntimeError(
446 '"yield from" should be used as context manager expression')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700447
448 def __exit__(self, *args):
Guido van Rossumab3c8892014-01-25 16:51:57 -0800449 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700450
Victor Stinnerd6dc7bd2015-03-18 11:37:42 +0100451 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700452 def __iter__(self):
Guido van Rossumab3c8892014-01-25 16:51:57 -0800453 # See comment in Lock.__iter__().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700454 yield from self.acquire()
Guido van Rossumab3c8892014-01-25 16:51:57 -0800455 return _ContextManager(self)
Guido van Rossum085869b2013-11-23 15:09:16 -0800456
457
458class BoundedSemaphore(Semaphore):
459 """A bounded semaphore implementation.
460
461 This raises ValueError in release() if it would increase the value
462 above the initial value.
463 """
464
465 def __init__(self, value=1, *, loop=None):
466 self._bound_value = value
467 super().__init__(value, loop=loop)
468
469 def release(self):
470 if self._value >= self._bound_value:
471 raise ValueError('BoundedSemaphore released too many times')
472 super().release()