blob: b943e9dd4f9704f5661ec683824efa5c7cdd3d73 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Synchronization primitives."""
2
Guido van Rossuma58d1c32013-11-24 22:32:09 -08003__all__ = ['Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004
5import collections
6
7from . import events
8from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +02009from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070010
11
Guido van Rossumab3c8892014-01-25 16:51:57 -080012class _ContextManager:
13 """Context manager.
14
15 This enables the following idiom for acquiring and releasing a
16 lock around a block:
17
18 with (yield from lock):
19 <block>
20
21 while failing loudly when accidentally using:
22
23 with lock:
24 <block>
25 """
26
27 def __init__(self, lock):
28 self._lock = lock
29
30 def __enter__(self):
31 # We have no use for the "as ..." clause in the with
32 # statement for locks.
33 return None
34
35 def __exit__(self, *args):
36 try:
37 self._lock.release()
38 finally:
39 self._lock = None # Crudely prevent reuse.
40
41
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070042class Lock:
43 """Primitive lock objects.
44
45 A primitive lock is a synchronization primitive that is not owned
46 by a particular coroutine when locked. A primitive lock is in one
47 of two states, 'locked' or 'unlocked'.
48
49 It is created in the unlocked state. It has two basic methods,
50 acquire() and release(). When the state is unlocked, acquire()
51 changes the state to locked and returns immediately. When the
52 state is locked, acquire() blocks until a call to release() in
53 another coroutine changes it to unlocked, then the acquire() call
54 resets it to locked and returns. The release() method should only
55 be called in the locked state; it changes the state to unlocked
56 and returns immediately. If an attempt is made to release an
57 unlocked lock, a RuntimeError will be raised.
58
59 When more than one coroutine is blocked in acquire() waiting for
60 the state to turn to unlocked, only one coroutine proceeds when a
61 release() call resets the state to unlocked; first coroutine which
62 is blocked in acquire() is being processed.
63
64 acquire() is a coroutine and should be called with 'yield from'.
65
Serhiy Storchaka14867992014-09-10 23:43:41 +030066 Locks also support the context management protocol. '(yield from lock)'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070067 should be used as context manager expression.
68
69 Usage:
70
71 lock = Lock()
72 ...
73 yield from lock
74 try:
75 ...
76 finally:
77 lock.release()
78
79 Context manager usage:
80
81 lock = Lock()
82 ...
83 with (yield from lock):
84 ...
85
86 Lock objects can be tested for locking state:
87
88 if not lock.locked():
89 yield from lock
90 else:
91 # lock is acquired
92 ...
93
94 """
95
96 def __init__(self, *, loop=None):
97 self._waiters = collections.deque()
98 self._locked = False
99 if loop is not None:
100 self._loop = loop
101 else:
102 self._loop = events.get_event_loop()
103
104 def __repr__(self):
105 res = super().__repr__()
106 extra = 'locked' if self._locked else 'unlocked'
107 if self._waiters:
108 extra = '{},waiters:{}'.format(extra, len(self._waiters))
109 return '<{} [{}]>'.format(res[1:-1], extra)
110
111 def locked(self):
Victor Stinnerc37dd612013-12-02 14:31:16 +0100112 """Return True if lock is acquired."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700113 return self._locked
114
Victor Stinnerf951d282014-06-29 00:46:45 +0200115 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700116 def acquire(self):
117 """Acquire a lock.
118
119 This method blocks until the lock is unlocked, then sets it to
120 locked and returns True.
121 """
122 if not self._waiters and not self._locked:
123 self._locked = True
124 return True
125
126 fut = futures.Future(loop=self._loop)
127 self._waiters.append(fut)
128 try:
129 yield from fut
130 self._locked = True
131 return True
132 finally:
133 self._waiters.remove(fut)
134
135 def release(self):
136 """Release a lock.
137
138 When the lock is locked, reset it to unlocked, and return.
139 If any other coroutines are blocked waiting for the lock to become
140 unlocked, allow exactly one of them to proceed.
141
142 When invoked on an unlocked lock, a RuntimeError is raised.
143
144 There is no return value.
145 """
146 if self._locked:
147 self._locked = False
148 # Wake up the first waiter who isn't cancelled.
149 for fut in self._waiters:
150 if not fut.done():
151 fut.set_result(True)
152 break
153 else:
154 raise RuntimeError('Lock is not acquired.')
155
156 def __enter__(self):
Guido van Rossumab3c8892014-01-25 16:51:57 -0800157 raise RuntimeError(
158 '"yield from" should be used as context manager expression')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700159
160 def __exit__(self, *args):
Guido van Rossumab3c8892014-01-25 16:51:57 -0800161 # This must exist because __enter__ exists, even though that
162 # always raises; that's how the with-statement works.
163 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700164
165 def __iter__(self):
Guido van Rossumab3c8892014-01-25 16:51:57 -0800166 # This is not a coroutine. It is meant to enable the idiom:
167 #
168 # with (yield from lock):
169 # <block>
170 #
171 # as an alternative to:
172 #
173 # yield from lock.acquire()
174 # try:
175 # <block>
176 # finally:
177 # lock.release()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700178 yield from self.acquire()
Guido van Rossumab3c8892014-01-25 16:51:57 -0800179 return _ContextManager(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700180
181
182class Event:
Guido van Rossum994bf432013-12-19 12:47:38 -0800183 """Asynchronous equivalent to threading.Event.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700184
185 Class implementing event objects. An event manages a flag that can be set
186 to true with the set() method and reset to false with the clear() method.
187 The wait() method blocks until the flag is true. The flag is initially
188 false.
189 """
190
191 def __init__(self, *, loop=None):
192 self._waiters = collections.deque()
193 self._value = False
194 if loop is not None:
195 self._loop = loop
196 else:
197 self._loop = events.get_event_loop()
198
199 def __repr__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700200 res = super().__repr__()
Guido van Rossumccea0842013-11-04 13:18:19 -0800201 extra = 'set' if self._value else 'unset'
202 if self._waiters:
203 extra = '{},waiters:{}'.format(extra, len(self._waiters))
204 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700205
206 def is_set(self):
Victor Stinnerc37dd612013-12-02 14:31:16 +0100207 """Return True if and only if the internal flag is true."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700208 return self._value
209
210 def set(self):
211 """Set the internal flag to true. All coroutines waiting for it to
212 become true are awakened. Coroutine that call wait() once the flag is
213 true will not block at all.
214 """
215 if not self._value:
216 self._value = True
217
218 for fut in self._waiters:
219 if not fut.done():
220 fut.set_result(True)
221
222 def clear(self):
223 """Reset the internal flag to false. Subsequently, coroutines calling
224 wait() will block until set() is called to set the internal flag
225 to true again."""
226 self._value = False
227
Victor Stinnerf951d282014-06-29 00:46:45 +0200228 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700229 def wait(self):
230 """Block until the internal flag is true.
231
232 If the internal flag is true on entry, return True
233 immediately. Otherwise, block until another coroutine calls
234 set() to set the flag to true, then return True.
235 """
236 if self._value:
237 return True
238
239 fut = futures.Future(loop=self._loop)
240 self._waiters.append(fut)
241 try:
242 yield from fut
243 return True
244 finally:
245 self._waiters.remove(fut)
246
247
Guido van Rossumccea0842013-11-04 13:18:19 -0800248class Condition:
Guido van Rossum994bf432013-12-19 12:47:38 -0800249 """Asynchronous equivalent to threading.Condition.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700250
251 This class implements condition variable objects. A condition variable
252 allows one or more coroutines to wait until they are notified by another
253 coroutine.
Guido van Rossumccea0842013-11-04 13:18:19 -0800254
255 A new Lock object is created and used as the underlying lock.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700256 """
257
Andrew Svetlovf21fcd02014-07-26 17:54:34 +0300258 def __init__(self, lock=None, *, loop=None):
Guido van Rossumccea0842013-11-04 13:18:19 -0800259 if loop is not None:
260 self._loop = loop
261 else:
262 self._loop = events.get_event_loop()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700263
Andrew Svetlovf21fcd02014-07-26 17:54:34 +0300264 if lock is None:
265 lock = Lock(loop=self._loop)
266 elif lock._loop is not self._loop:
267 raise ValueError("loop argument must agree with lock")
268
Guido van Rossumccea0842013-11-04 13:18:19 -0800269 self._lock = lock
270 # Export the lock's locked(), acquire() and release() methods.
271 self.locked = lock.locked
272 self.acquire = lock.acquire
273 self.release = lock.release
274
275 self._waiters = collections.deque()
276
277 def __repr__(self):
278 res = super().__repr__()
279 extra = 'locked' if self.locked() else 'unlocked'
280 if self._waiters:
281 extra = '{},waiters:{}'.format(extra, len(self._waiters))
282 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700283
Victor Stinnerf951d282014-06-29 00:46:45 +0200284 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700285 def wait(self):
286 """Wait until notified.
287
288 If the calling coroutine has not acquired the lock when this
289 method is called, a RuntimeError is raised.
290
291 This method releases the underlying lock, and then blocks
292 until it is awakened by a notify() or notify_all() call for
293 the same condition variable in another coroutine. Once
294 awakened, it re-acquires the lock and returns True.
295 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800296 if not self.locked():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700297 raise RuntimeError('cannot wait on un-acquired lock')
298
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700299 self.release()
300 try:
301 fut = futures.Future(loop=self._loop)
Guido van Rossumccea0842013-11-04 13:18:19 -0800302 self._waiters.append(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700303 try:
304 yield from fut
305 return True
306 finally:
Guido van Rossumccea0842013-11-04 13:18:19 -0800307 self._waiters.remove(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700308
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700309 finally:
Guido van Rossum2407f3b2014-01-10 13:25:38 -0800310 yield from self.acquire()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700311
Victor Stinnerf951d282014-06-29 00:46:45 +0200312 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700313 def wait_for(self, predicate):
314 """Wait until a predicate becomes true.
315
316 The predicate should be a callable which result will be
317 interpreted as a boolean value. The final predicate value is
318 the return value.
319 """
320 result = predicate()
321 while not result:
322 yield from self.wait()
323 result = predicate()
324 return result
325
326 def notify(self, n=1):
327 """By default, wake up one coroutine waiting on this condition, if any.
328 If the calling coroutine has not acquired the lock when this method
329 is called, a RuntimeError is raised.
330
331 This method wakes up at most n of the coroutines waiting for the
332 condition variable; it is a no-op if no coroutines are waiting.
333
334 Note: an awakened coroutine does not actually return from its
335 wait() call until it can reacquire the lock. Since notify() does
336 not release the lock, its caller should.
337 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800338 if not self.locked():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700339 raise RuntimeError('cannot notify on un-acquired lock')
340
341 idx = 0
Guido van Rossumccea0842013-11-04 13:18:19 -0800342 for fut in self._waiters:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700343 if idx >= n:
344 break
345
346 if not fut.done():
347 idx += 1
348 fut.set_result(False)
349
350 def notify_all(self):
351 """Wake up all threads waiting on this condition. This method acts
352 like notify(), but wakes up all waiting threads instead of one. If the
353 calling thread has not acquired the lock when this method is called,
354 a RuntimeError is raised.
355 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800356 self.notify(len(self._waiters))
357
358 def __enter__(self):
Guido van Rossumab3c8892014-01-25 16:51:57 -0800359 raise RuntimeError(
360 '"yield from" should be used as context manager expression')
Guido van Rossumccea0842013-11-04 13:18:19 -0800361
362 def __exit__(self, *args):
Guido van Rossumab3c8892014-01-25 16:51:57 -0800363 pass
Guido van Rossumccea0842013-11-04 13:18:19 -0800364
365 def __iter__(self):
Guido van Rossumab3c8892014-01-25 16:51:57 -0800366 # See comment in Lock.__iter__().
Guido van Rossumccea0842013-11-04 13:18:19 -0800367 yield from self.acquire()
Guido van Rossumab3c8892014-01-25 16:51:57 -0800368 return _ContextManager(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700369
370
371class Semaphore:
372 """A Semaphore implementation.
373
374 A semaphore manages an internal counter which is decremented by each
375 acquire() call and incremented by each release() call. The counter
376 can never go below zero; when acquire() finds that it is zero, it blocks,
377 waiting until some other thread calls release().
378
Serhiy Storchaka14867992014-09-10 23:43:41 +0300379 Semaphores also support the context management protocol.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700380
Guido van Rossum085869b2013-11-23 15:09:16 -0800381 The optional argument gives the initial value for the internal
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700382 counter; it defaults to 1. If the value given is less than 0,
383 ValueError is raised.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700384 """
385
Guido van Rossum085869b2013-11-23 15:09:16 -0800386 def __init__(self, value=1, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700387 if value < 0:
Guido van Rossum9c55a582013-11-21 11:07:45 -0800388 raise ValueError("Semaphore initial value must be >= 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700389 self._value = value
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700390 self._waiters = collections.deque()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700391 if loop is not None:
392 self._loop = loop
393 else:
394 self._loop = events.get_event_loop()
395
396 def __repr__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700397 res = super().__repr__()
Guido van Rossumab3c8892014-01-25 16:51:57 -0800398 extra = 'locked' if self.locked() else 'unlocked,value:{}'.format(
Guido van Rossumccea0842013-11-04 13:18:19 -0800399 self._value)
400 if self._waiters:
401 extra = '{},waiters:{}'.format(extra, len(self._waiters))
402 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700403
404 def locked(self):
405 """Returns True if semaphore can not be acquired immediately."""
Guido van Rossumab3c8892014-01-25 16:51:57 -0800406 return self._value == 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700407
Victor Stinnerf951d282014-06-29 00:46:45 +0200408 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700409 def acquire(self):
410 """Acquire a semaphore.
411
412 If the internal counter is larger than zero on entry,
413 decrement it by one and return True immediately. If it is
414 zero on entry, block, waiting until some other coroutine has
415 called release() to make it larger than 0, and then return
416 True.
417 """
418 if not self._waiters and self._value > 0:
419 self._value -= 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700420 return True
421
422 fut = futures.Future(loop=self._loop)
423 self._waiters.append(fut)
424 try:
425 yield from fut
426 self._value -= 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700427 return True
428 finally:
429 self._waiters.remove(fut)
430
431 def release(self):
432 """Release a semaphore, incrementing the internal counter by one.
433 When it was zero on entry and another coroutine is waiting for it to
434 become larger than zero again, wake up that coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700435 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700436 self._value += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700437 for waiter in self._waiters:
438 if not waiter.done():
439 waiter.set_result(True)
440 break
441
442 def __enter__(self):
Guido van Rossumab3c8892014-01-25 16:51:57 -0800443 raise RuntimeError(
444 '"yield from" should be used as context manager expression')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700445
446 def __exit__(self, *args):
Guido van Rossumab3c8892014-01-25 16:51:57 -0800447 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700448
449 def __iter__(self):
Guido van Rossumab3c8892014-01-25 16:51:57 -0800450 # See comment in Lock.__iter__().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700451 yield from self.acquire()
Guido van Rossumab3c8892014-01-25 16:51:57 -0800452 return _ContextManager(self)
Guido van Rossum085869b2013-11-23 15:09:16 -0800453
454
455class BoundedSemaphore(Semaphore):
456 """A bounded semaphore implementation.
457
458 This raises ValueError in release() if it would increase the value
459 above the initial value.
460 """
461
462 def __init__(self, value=1, *, loop=None):
463 self._bound_value = value
464 super().__init__(value, loop=loop)
465
466 def release(self):
467 if self._value >= self._bound_value:
468 raise ValueError('BoundedSemaphore released too many times')
469 super().release()