blob: 29c4434a62ad3908e83b17465b4e810c4d1a9323 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Synchronization primitives."""
2
Guido van Rossuma58d1c32013-11-24 22:32:09 -08003__all__ = ['Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004
5import collections
6
7from . import events
8from . import futures
9from . import tasks
10
11
Guido van Rossumab3c8892014-01-25 16:51:57 -080012class _ContextManager:
13 """Context manager.
14
15 This enables the following idiom for acquiring and releasing a
16 lock around a block:
17
18 with (yield from lock):
19 <block>
20
21 while failing loudly when accidentally using:
22
23 with lock:
24 <block>
25 """
26
27 def __init__(self, lock):
28 self._lock = lock
29
30 def __enter__(self):
31 # We have no use for the "as ..." clause in the with
32 # statement for locks.
33 return None
34
35 def __exit__(self, *args):
36 try:
37 self._lock.release()
38 finally:
39 self._lock = None # Crudely prevent reuse.
40
41
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070042class Lock:
43 """Primitive lock objects.
44
45 A primitive lock is a synchronization primitive that is not owned
46 by a particular coroutine when locked. A primitive lock is in one
47 of two states, 'locked' or 'unlocked'.
48
49 It is created in the unlocked state. It has two basic methods,
50 acquire() and release(). When the state is unlocked, acquire()
51 changes the state to locked and returns immediately. When the
52 state is locked, acquire() blocks until a call to release() in
53 another coroutine changes it to unlocked, then the acquire() call
54 resets it to locked and returns. The release() method should only
55 be called in the locked state; it changes the state to unlocked
56 and returns immediately. If an attempt is made to release an
57 unlocked lock, a RuntimeError will be raised.
58
59 When more than one coroutine is blocked in acquire() waiting for
60 the state to turn to unlocked, only one coroutine proceeds when a
61 release() call resets the state to unlocked; first coroutine which
62 is blocked in acquire() is being processed.
63
64 acquire() is a coroutine and should be called with 'yield from'.
65
66 Locks also support the context manager protocol. '(yield from lock)'
67 should be used as context manager expression.
68
69 Usage:
70
71 lock = Lock()
72 ...
73 yield from lock
74 try:
75 ...
76 finally:
77 lock.release()
78
79 Context manager usage:
80
81 lock = Lock()
82 ...
83 with (yield from lock):
84 ...
85
86 Lock objects can be tested for locking state:
87
88 if not lock.locked():
89 yield from lock
90 else:
91 # lock is acquired
92 ...
93
94 """
95
96 def __init__(self, *, loop=None):
97 self._waiters = collections.deque()
98 self._locked = False
99 if loop is not None:
100 self._loop = loop
101 else:
102 self._loop = events.get_event_loop()
103
104 def __repr__(self):
105 res = super().__repr__()
106 extra = 'locked' if self._locked else 'unlocked'
107 if self._waiters:
108 extra = '{},waiters:{}'.format(extra, len(self._waiters))
109 return '<{} [{}]>'.format(res[1:-1], extra)
110
111 def locked(self):
Victor Stinnerc37dd612013-12-02 14:31:16 +0100112 """Return True if lock is acquired."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700113 return self._locked
114
115 @tasks.coroutine
116 def acquire(self):
117 """Acquire a lock.
118
119 This method blocks until the lock is unlocked, then sets it to
120 locked and returns True.
121 """
122 if not self._waiters and not self._locked:
123 self._locked = True
124 return True
125
126 fut = futures.Future(loop=self._loop)
127 self._waiters.append(fut)
128 try:
129 yield from fut
130 self._locked = True
131 return True
132 finally:
133 self._waiters.remove(fut)
134
135 def release(self):
136 """Release a lock.
137
138 When the lock is locked, reset it to unlocked, and return.
139 If any other coroutines are blocked waiting for the lock to become
140 unlocked, allow exactly one of them to proceed.
141
142 When invoked on an unlocked lock, a RuntimeError is raised.
143
144 There is no return value.
145 """
146 if self._locked:
147 self._locked = False
148 # Wake up the first waiter who isn't cancelled.
149 for fut in self._waiters:
150 if not fut.done():
151 fut.set_result(True)
152 break
153 else:
154 raise RuntimeError('Lock is not acquired.')
155
156 def __enter__(self):
Guido van Rossumab3c8892014-01-25 16:51:57 -0800157 raise RuntimeError(
158 '"yield from" should be used as context manager expression')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700159
160 def __exit__(self, *args):
Guido van Rossumab3c8892014-01-25 16:51:57 -0800161 # This must exist because __enter__ exists, even though that
162 # always raises; that's how the with-statement works.
163 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700164
165 def __iter__(self):
Guido van Rossumab3c8892014-01-25 16:51:57 -0800166 # This is not a coroutine. It is meant to enable the idiom:
167 #
168 # with (yield from lock):
169 # <block>
170 #
171 # as an alternative to:
172 #
173 # yield from lock.acquire()
174 # try:
175 # <block>
176 # finally:
177 # lock.release()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700178 yield from self.acquire()
Guido van Rossumab3c8892014-01-25 16:51:57 -0800179 return _ContextManager(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700180
181
182class Event:
Guido van Rossum994bf432013-12-19 12:47:38 -0800183 """Asynchronous equivalent to threading.Event.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700184
185 Class implementing event objects. An event manages a flag that can be set
186 to true with the set() method and reset to false with the clear() method.
187 The wait() method blocks until the flag is true. The flag is initially
188 false.
189 """
190
191 def __init__(self, *, loop=None):
192 self._waiters = collections.deque()
193 self._value = False
194 if loop is not None:
195 self._loop = loop
196 else:
197 self._loop = events.get_event_loop()
198
199 def __repr__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700200 res = super().__repr__()
Guido van Rossumccea0842013-11-04 13:18:19 -0800201 extra = 'set' if self._value else 'unset'
202 if self._waiters:
203 extra = '{},waiters:{}'.format(extra, len(self._waiters))
204 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700205
206 def is_set(self):
Victor Stinnerc37dd612013-12-02 14:31:16 +0100207 """Return True if and only if the internal flag is true."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700208 return self._value
209
210 def set(self):
211 """Set the internal flag to true. All coroutines waiting for it to
212 become true are awakened. Coroutine that call wait() once the flag is
213 true will not block at all.
214 """
215 if not self._value:
216 self._value = True
217
218 for fut in self._waiters:
219 if not fut.done():
220 fut.set_result(True)
221
222 def clear(self):
223 """Reset the internal flag to false. Subsequently, coroutines calling
224 wait() will block until set() is called to set the internal flag
225 to true again."""
226 self._value = False
227
228 @tasks.coroutine
229 def wait(self):
230 """Block until the internal flag is true.
231
232 If the internal flag is true on entry, return True
233 immediately. Otherwise, block until another coroutine calls
234 set() to set the flag to true, then return True.
235 """
236 if self._value:
237 return True
238
239 fut = futures.Future(loop=self._loop)
240 self._waiters.append(fut)
241 try:
242 yield from fut
243 return True
244 finally:
245 self._waiters.remove(fut)
246
247
Guido van Rossumccea0842013-11-04 13:18:19 -0800248class Condition:
Guido van Rossum994bf432013-12-19 12:47:38 -0800249 """Asynchronous equivalent to threading.Condition.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700250
251 This class implements condition variable objects. A condition variable
252 allows one or more coroutines to wait until they are notified by another
253 coroutine.
Guido van Rossumccea0842013-11-04 13:18:19 -0800254
255 A new Lock object is created and used as the underlying lock.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700256 """
257
258 def __init__(self, *, loop=None):
Guido van Rossumccea0842013-11-04 13:18:19 -0800259 if loop is not None:
260 self._loop = loop
261 else:
262 self._loop = events.get_event_loop()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700263
Guido van Rossumccea0842013-11-04 13:18:19 -0800264 # Lock as an attribute as in threading.Condition.
265 lock = Lock(loop=self._loop)
266 self._lock = lock
267 # Export the lock's locked(), acquire() and release() methods.
268 self.locked = lock.locked
269 self.acquire = lock.acquire
270 self.release = lock.release
271
272 self._waiters = collections.deque()
273
274 def __repr__(self):
275 res = super().__repr__()
276 extra = 'locked' if self.locked() else 'unlocked'
277 if self._waiters:
278 extra = '{},waiters:{}'.format(extra, len(self._waiters))
279 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700280
281 @tasks.coroutine
282 def wait(self):
283 """Wait until notified.
284
285 If the calling coroutine has not acquired the lock when this
286 method is called, a RuntimeError is raised.
287
288 This method releases the underlying lock, and then blocks
289 until it is awakened by a notify() or notify_all() call for
290 the same condition variable in another coroutine. Once
291 awakened, it re-acquires the lock and returns True.
292 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800293 if not self.locked():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700294 raise RuntimeError('cannot wait on un-acquired lock')
295
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700296 self.release()
297 try:
298 fut = futures.Future(loop=self._loop)
Guido van Rossumccea0842013-11-04 13:18:19 -0800299 self._waiters.append(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700300 try:
301 yield from fut
302 return True
303 finally:
Guido van Rossumccea0842013-11-04 13:18:19 -0800304 self._waiters.remove(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700305
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700306 finally:
Guido van Rossum2407f3b2014-01-10 13:25:38 -0800307 yield from self.acquire()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700308
309 @tasks.coroutine
310 def wait_for(self, predicate):
311 """Wait until a predicate becomes true.
312
313 The predicate should be a callable which result will be
314 interpreted as a boolean value. The final predicate value is
315 the return value.
316 """
317 result = predicate()
318 while not result:
319 yield from self.wait()
320 result = predicate()
321 return result
322
323 def notify(self, n=1):
324 """By default, wake up one coroutine waiting on this condition, if any.
325 If the calling coroutine has not acquired the lock when this method
326 is called, a RuntimeError is raised.
327
328 This method wakes up at most n of the coroutines waiting for the
329 condition variable; it is a no-op if no coroutines are waiting.
330
331 Note: an awakened coroutine does not actually return from its
332 wait() call until it can reacquire the lock. Since notify() does
333 not release the lock, its caller should.
334 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800335 if not self.locked():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700336 raise RuntimeError('cannot notify on un-acquired lock')
337
338 idx = 0
Guido van Rossumccea0842013-11-04 13:18:19 -0800339 for fut in self._waiters:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700340 if idx >= n:
341 break
342
343 if not fut.done():
344 idx += 1
345 fut.set_result(False)
346
347 def notify_all(self):
348 """Wake up all threads waiting on this condition. This method acts
349 like notify(), but wakes up all waiting threads instead of one. If the
350 calling thread has not acquired the lock when this method is called,
351 a RuntimeError is raised.
352 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800353 self.notify(len(self._waiters))
354
355 def __enter__(self):
Guido van Rossumab3c8892014-01-25 16:51:57 -0800356 raise RuntimeError(
357 '"yield from" should be used as context manager expression')
Guido van Rossumccea0842013-11-04 13:18:19 -0800358
359 def __exit__(self, *args):
Guido van Rossumab3c8892014-01-25 16:51:57 -0800360 pass
Guido van Rossumccea0842013-11-04 13:18:19 -0800361
362 def __iter__(self):
Guido van Rossumab3c8892014-01-25 16:51:57 -0800363 # See comment in Lock.__iter__().
Guido van Rossumccea0842013-11-04 13:18:19 -0800364 yield from self.acquire()
Guido van Rossumab3c8892014-01-25 16:51:57 -0800365 return _ContextManager(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700366
367
368class Semaphore:
369 """A Semaphore implementation.
370
371 A semaphore manages an internal counter which is decremented by each
372 acquire() call and incremented by each release() call. The counter
373 can never go below zero; when acquire() finds that it is zero, it blocks,
374 waiting until some other thread calls release().
375
376 Semaphores also support the context manager protocol.
377
Guido van Rossum085869b2013-11-23 15:09:16 -0800378 The optional argument gives the initial value for the internal
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700379 counter; it defaults to 1. If the value given is less than 0,
380 ValueError is raised.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700381 """
382
Guido van Rossum085869b2013-11-23 15:09:16 -0800383 def __init__(self, value=1, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700384 if value < 0:
Guido van Rossum9c55a582013-11-21 11:07:45 -0800385 raise ValueError("Semaphore initial value must be >= 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700386 self._value = value
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700387 self._waiters = collections.deque()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700388 if loop is not None:
389 self._loop = loop
390 else:
391 self._loop = events.get_event_loop()
392
393 def __repr__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700394 res = super().__repr__()
Guido van Rossumab3c8892014-01-25 16:51:57 -0800395 extra = 'locked' if self.locked() else 'unlocked,value:{}'.format(
Guido van Rossumccea0842013-11-04 13:18:19 -0800396 self._value)
397 if self._waiters:
398 extra = '{},waiters:{}'.format(extra, len(self._waiters))
399 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700400
401 def locked(self):
402 """Returns True if semaphore can not be acquired immediately."""
Guido van Rossumab3c8892014-01-25 16:51:57 -0800403 return self._value == 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700404
405 @tasks.coroutine
406 def acquire(self):
407 """Acquire a semaphore.
408
409 If the internal counter is larger than zero on entry,
410 decrement it by one and return True immediately. If it is
411 zero on entry, block, waiting until some other coroutine has
412 called release() to make it larger than 0, and then return
413 True.
414 """
415 if not self._waiters and self._value > 0:
416 self._value -= 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700417 return True
418
419 fut = futures.Future(loop=self._loop)
420 self._waiters.append(fut)
421 try:
422 yield from fut
423 self._value -= 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700424 return True
425 finally:
426 self._waiters.remove(fut)
427
428 def release(self):
429 """Release a semaphore, incrementing the internal counter by one.
430 When it was zero on entry and another coroutine is waiting for it to
431 become larger than zero again, wake up that coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700432 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700433 self._value += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700434 for waiter in self._waiters:
435 if not waiter.done():
436 waiter.set_result(True)
437 break
438
439 def __enter__(self):
Guido van Rossumab3c8892014-01-25 16:51:57 -0800440 raise RuntimeError(
441 '"yield from" should be used as context manager expression')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700442
443 def __exit__(self, *args):
Guido van Rossumab3c8892014-01-25 16:51:57 -0800444 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700445
446 def __iter__(self):
Guido van Rossumab3c8892014-01-25 16:51:57 -0800447 # See comment in Lock.__iter__().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700448 yield from self.acquire()
Guido van Rossumab3c8892014-01-25 16:51:57 -0800449 return _ContextManager(self)
Guido van Rossum085869b2013-11-23 15:09:16 -0800450
451
452class BoundedSemaphore(Semaphore):
453 """A bounded semaphore implementation.
454
455 This raises ValueError in release() if it would increase the value
456 above the initial value.
457 """
458
459 def __init__(self, value=1, *, loop=None):
460 self._bound_value = value
461 super().__init__(value, loop=loop)
462
463 def release(self):
464 if self._value >= self._bound_value:
465 raise ValueError('BoundedSemaphore released too many times')
466 super().release()