blob: a7453fb1c77287898bedeaf3828daafe49b0f013 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Synchronization primitives."""
2
Yury Selivanov6370f342017-12-10 18:36:12 -05003__all__ = ('Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004
5import collections
6
Andrew Svetlov0baa72f2018-09-11 10:13:04 -07007from . import exceptions
Yurii Karabas0ec34ca2020-11-24 20:08:54 +02008from . import mixins
Guido van Rossumab3c8892014-01-25 16:51:57 -08009
10
Yury Selivanovd08c3632015-05-13 15:15:56 -040011class _ContextManagerMixin:
Andrew Svetlov5f841b52017-12-09 00:23:48 +020012 async def __aenter__(self):
13 await self.acquire()
Victor Stinner3f438a92017-11-28 14:43:52 +010014 # We have no use for the "as ..." clause in the with
15 # statement for locks.
16 return None
Yury Selivanovd08c3632015-05-13 15:15:56 -040017
Andrew Svetlov5f841b52017-12-09 00:23:48 +020018 async def __aexit__(self, exc_type, exc, tb):
Victor Stinner3f438a92017-11-28 14:43:52 +010019 self.release()
Yury Selivanovd08c3632015-05-13 15:15:56 -040020
21
Yurii Karabasb9127dd2020-11-25 13:50:44 +020022class Lock(_ContextManagerMixin, mixins._LoopBoundMixin):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023 """Primitive lock objects.
24
25 A primitive lock is a synchronization primitive that is not owned
26 by a particular coroutine when locked. A primitive lock is in one
27 of two states, 'locked' or 'unlocked'.
28
29 It is created in the unlocked state. It has two basic methods,
30 acquire() and release(). When the state is unlocked, acquire()
31 changes the state to locked and returns immediately. When the
32 state is locked, acquire() blocks until a call to release() in
33 another coroutine changes it to unlocked, then the acquire() call
34 resets it to locked and returns. The release() method should only
35 be called in the locked state; it changes the state to unlocked
36 and returns immediately. If an attempt is made to release an
37 unlocked lock, a RuntimeError will be raised.
38
39 When more than one coroutine is blocked in acquire() waiting for
40 the state to turn to unlocked, only one coroutine proceeds when a
41 release() call resets the state to unlocked; first coroutine which
42 is blocked in acquire() is being processed.
43
Andrew Svetlov88743422017-12-11 17:35:49 +020044 acquire() is a coroutine and should be called with 'await'.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070045
Andrew Svetlov88743422017-12-11 17:35:49 +020046 Locks also support the asynchronous context management protocol.
47 'async with lock' statement should be used.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070048
49 Usage:
50
51 lock = Lock()
52 ...
Andrew Svetlov88743422017-12-11 17:35:49 +020053 await lock.acquire()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070054 try:
55 ...
56 finally:
57 lock.release()
58
59 Context manager usage:
60
61 lock = Lock()
62 ...
Andrew Svetlov88743422017-12-11 17:35:49 +020063 async with lock:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070064 ...
65
66 Lock objects can be tested for locking state:
67
68 if not lock.locked():
Andrew Svetlov88743422017-12-11 17:35:49 +020069 await lock.acquire()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070070 else:
71 # lock is acquired
72 ...
73
74 """
75
Yurii Karabasb9127dd2020-11-25 13:50:44 +020076 def __init__(self, *, loop=mixins._marker):
77 super().__init__(loop=loop)
Zackery Spytz9aa78562019-06-05 03:33:27 -060078 self._waiters = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070079 self._locked = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070080
81 def __repr__(self):
82 res = super().__repr__()
83 extra = 'locked' if self._locked else 'unlocked'
84 if self._waiters:
Yury Selivanov6370f342017-12-10 18:36:12 -050085 extra = f'{extra}, waiters:{len(self._waiters)}'
86 return f'<{res[1:-1]} [{extra}]>'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070087
88 def locked(self):
Victor Stinnerc37dd612013-12-02 14:31:16 +010089 """Return True if lock is acquired."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070090 return self._locked
91
Andrew Svetlov5f841b52017-12-09 00:23:48 +020092 async def acquire(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070093 """Acquire a lock.
94
95 This method blocks until the lock is unlocked, then sets it to
96 locked and returns True.
97 """
Zackery Spytz9aa78562019-06-05 03:33:27 -060098 if (not self._locked and (self._waiters is None or
99 all(w.cancelled() for w in self._waiters))):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700100 self._locked = True
101 return True
102
Zackery Spytz9aa78562019-06-05 03:33:27 -0600103 if self._waiters is None:
104 self._waiters = collections.deque()
Yurii Karabas0ec34ca2020-11-24 20:08:54 +0200105 fut = self._get_loop().create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700106 self._waiters.append(fut)
Bar Harel2f79c012018-02-03 00:04:00 +0200107
108 # Finally block should be called before the CancelledError
109 # handling as we don't want CancelledError to call
110 # _wake_up_first() and attempt to wake up itself.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700111 try:
Bar Harel2f79c012018-02-03 00:04:00 +0200112 try:
113 await fut
114 finally:
115 self._waiters.remove(fut)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700116 except exceptions.CancelledError:
Mathieu Sornay894a6542017-06-09 22:17:40 +0200117 if not self._locked:
118 self._wake_up_first()
119 raise
Bar Harel2f79c012018-02-03 00:04:00 +0200120
121 self._locked = True
122 return True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700123
124 def release(self):
125 """Release a lock.
126
127 When the lock is locked, reset it to unlocked, and return.
128 If any other coroutines are blocked waiting for the lock to become
129 unlocked, allow exactly one of them to proceed.
130
131 When invoked on an unlocked lock, a RuntimeError is raised.
132
133 There is no return value.
134 """
135 if self._locked:
136 self._locked = False
Mathieu Sornay894a6542017-06-09 22:17:40 +0200137 self._wake_up_first()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700138 else:
139 raise RuntimeError('Lock is not acquired.')
140
Mathieu Sornay894a6542017-06-09 22:17:40 +0200141 def _wake_up_first(self):
Bar Harel2f79c012018-02-03 00:04:00 +0200142 """Wake up the first waiter if it isn't done."""
Zackery Spytz9aa78562019-06-05 03:33:27 -0600143 if not self._waiters:
144 return
Bar Harel2f79c012018-02-03 00:04:00 +0200145 try:
146 fut = next(iter(self._waiters))
147 except StopIteration:
148 return
149
150 # .done() necessarily means that a waiter will wake up later on and
151 # either take the lock, or, if it was cancelled and lock wasn't
152 # taken already, will hit this again and wake up a new waiter.
153 if not fut.done():
154 fut.set_result(True)
Mathieu Sornay894a6542017-06-09 22:17:40 +0200155
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700156
Yurii Karabasb9127dd2020-11-25 13:50:44 +0200157class Event(mixins._LoopBoundMixin):
Guido van Rossum994bf432013-12-19 12:47:38 -0800158 """Asynchronous equivalent to threading.Event.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700159
160 Class implementing event objects. An event manages a flag that can be set
161 to true with the set() method and reset to false with the clear() method.
162 The wait() method blocks until the flag is true. The flag is initially
163 false.
164 """
165
Yurii Karabasb9127dd2020-11-25 13:50:44 +0200166 def __init__(self, *, loop=mixins._marker):
167 super().__init__(loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700168 self._waiters = collections.deque()
169 self._value = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700170
171 def __repr__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700172 res = super().__repr__()
Guido van Rossumccea0842013-11-04 13:18:19 -0800173 extra = 'set' if self._value else 'unset'
174 if self._waiters:
Yury Selivanov6370f342017-12-10 18:36:12 -0500175 extra = f'{extra}, waiters:{len(self._waiters)}'
176 return f'<{res[1:-1]} [{extra}]>'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700177
178 def is_set(self):
Victor Stinnerc37dd612013-12-02 14:31:16 +0100179 """Return True if and only if the internal flag is true."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700180 return self._value
181
182 def set(self):
183 """Set the internal flag to true. All coroutines waiting for it to
184 become true are awakened. Coroutine that call wait() once the flag is
185 true will not block at all.
186 """
187 if not self._value:
188 self._value = True
189
190 for fut in self._waiters:
191 if not fut.done():
192 fut.set_result(True)
193
194 def clear(self):
195 """Reset the internal flag to false. Subsequently, coroutines calling
196 wait() will block until set() is called to set the internal flag
197 to true again."""
198 self._value = False
199
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200200 async def wait(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700201 """Block until the internal flag is true.
202
203 If the internal flag is true on entry, return True
204 immediately. Otherwise, block until another coroutine calls
205 set() to set the flag to true, then return True.
206 """
207 if self._value:
208 return True
209
Yurii Karabas0ec34ca2020-11-24 20:08:54 +0200210 fut = self._get_loop().create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700211 self._waiters.append(fut)
212 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200213 await fut
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700214 return True
215 finally:
216 self._waiters.remove(fut)
217
218
Yurii Karabasb9127dd2020-11-25 13:50:44 +0200219class Condition(_ContextManagerMixin, mixins._LoopBoundMixin):
Guido van Rossum994bf432013-12-19 12:47:38 -0800220 """Asynchronous equivalent to threading.Condition.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700221
222 This class implements condition variable objects. A condition variable
223 allows one or more coroutines to wait until they are notified by another
224 coroutine.
Guido van Rossumccea0842013-11-04 13:18:19 -0800225
226 A new Lock object is created and used as the underlying lock.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700227 """
228
Yurii Karabasb9127dd2020-11-25 13:50:44 +0200229 def __init__(self, lock=None, *, loop=mixins._marker):
230 super().__init__(loop=loop)
Andrew Svetlovf21fcd02014-07-26 17:54:34 +0300231 if lock is None:
Yurii Karabas0ec34ca2020-11-24 20:08:54 +0200232 lock = Lock()
233 elif lock._loop is not self._get_loop():
Andrew Svetlovf21fcd02014-07-26 17:54:34 +0300234 raise ValueError("loop argument must agree with lock")
235
Guido van Rossumccea0842013-11-04 13:18:19 -0800236 self._lock = lock
237 # Export the lock's locked(), acquire() and release() methods.
238 self.locked = lock.locked
239 self.acquire = lock.acquire
240 self.release = lock.release
241
242 self._waiters = collections.deque()
243
244 def __repr__(self):
245 res = super().__repr__()
246 extra = 'locked' if self.locked() else 'unlocked'
247 if self._waiters:
Yury Selivanov6370f342017-12-10 18:36:12 -0500248 extra = f'{extra}, waiters:{len(self._waiters)}'
249 return f'<{res[1:-1]} [{extra}]>'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700250
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200251 async def wait(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700252 """Wait until notified.
253
254 If the calling coroutine has not acquired the lock when this
255 method is called, a RuntimeError is raised.
256
257 This method releases the underlying lock, and then blocks
258 until it is awakened by a notify() or notify_all() call for
259 the same condition variable in another coroutine. Once
260 awakened, it re-acquires the lock and returns True.
261 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800262 if not self.locked():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700263 raise RuntimeError('cannot wait on un-acquired lock')
264
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700265 self.release()
266 try:
Yurii Karabas0ec34ca2020-11-24 20:08:54 +0200267 fut = self._get_loop().create_future()
Guido van Rossumccea0842013-11-04 13:18:19 -0800268 self._waiters.append(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700269 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200270 await fut
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700271 return True
272 finally:
Guido van Rossumccea0842013-11-04 13:18:19 -0800273 self._waiters.remove(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700274
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700275 finally:
Yury Selivanovc92bf832016-06-11 12:00:07 -0400276 # Must reacquire lock even if wait is cancelled
Bar Harel57465102018-02-14 11:18:11 +0200277 cancelled = False
Yury Selivanovc92bf832016-06-11 12:00:07 -0400278 while True:
279 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200280 await self.acquire()
Yury Selivanovc92bf832016-06-11 12:00:07 -0400281 break
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700282 except exceptions.CancelledError:
Bar Harel57465102018-02-14 11:18:11 +0200283 cancelled = True
284
285 if cancelled:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700286 raise exceptions.CancelledError
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700287
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200288 async def wait_for(self, predicate):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700289 """Wait until a predicate becomes true.
290
291 The predicate should be a callable which result will be
292 interpreted as a boolean value. The final predicate value is
293 the return value.
294 """
295 result = predicate()
296 while not result:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200297 await self.wait()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700298 result = predicate()
299 return result
300
301 def notify(self, n=1):
302 """By default, wake up one coroutine waiting on this condition, if any.
303 If the calling coroutine has not acquired the lock when this method
304 is called, a RuntimeError is raised.
305
306 This method wakes up at most n of the coroutines waiting for the
307 condition variable; it is a no-op if no coroutines are waiting.
308
309 Note: an awakened coroutine does not actually return from its
310 wait() call until it can reacquire the lock. Since notify() does
311 not release the lock, its caller should.
312 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800313 if not self.locked():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700314 raise RuntimeError('cannot notify on un-acquired lock')
315
316 idx = 0
Guido van Rossumccea0842013-11-04 13:18:19 -0800317 for fut in self._waiters:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700318 if idx >= n:
319 break
320
321 if not fut.done():
322 idx += 1
323 fut.set_result(False)
324
325 def notify_all(self):
326 """Wake up all threads waiting on this condition. This method acts
327 like notify(), but wakes up all waiting threads instead of one. If the
328 calling thread has not acquired the lock when this method is called,
329 a RuntimeError is raised.
330 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800331 self.notify(len(self._waiters))
332
Guido van Rossumccea0842013-11-04 13:18:19 -0800333
Yurii Karabasb9127dd2020-11-25 13:50:44 +0200334class Semaphore(_ContextManagerMixin, mixins._LoopBoundMixin):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700335 """A Semaphore implementation.
336
337 A semaphore manages an internal counter which is decremented by each
338 acquire() call and incremented by each release() call. The counter
339 can never go below zero; when acquire() finds that it is zero, it blocks,
340 waiting until some other thread calls release().
341
Serhiy Storchaka14867992014-09-10 23:43:41 +0300342 Semaphores also support the context management protocol.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700343
Guido van Rossum085869b2013-11-23 15:09:16 -0800344 The optional argument gives the initial value for the internal
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700345 counter; it defaults to 1. If the value given is less than 0,
346 ValueError is raised.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700347 """
348
Yurii Karabasb9127dd2020-11-25 13:50:44 +0200349 def __init__(self, value=1, *, loop=mixins._marker):
350 super().__init__(loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700351 if value < 0:
Guido van Rossum9c55a582013-11-21 11:07:45 -0800352 raise ValueError("Semaphore initial value must be >= 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700353 self._value = value
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700354 self._waiters = collections.deque()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700355
356 def __repr__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700357 res = super().__repr__()
Yury Selivanov6370f342017-12-10 18:36:12 -0500358 extra = 'locked' if self.locked() else f'unlocked, value:{self._value}'
Guido van Rossumccea0842013-11-04 13:18:19 -0800359 if self._waiters:
Yury Selivanov6370f342017-12-10 18:36:12 -0500360 extra = f'{extra}, waiters:{len(self._waiters)}'
361 return f'<{res[1:-1]} [{extra}]>'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700362
Guido van Rossumd455a502015-09-29 11:54:45 -0700363 def _wake_up_next(self):
364 while self._waiters:
365 waiter = self._waiters.popleft()
366 if not waiter.done():
367 waiter.set_result(None)
368 return
369
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700370 def locked(self):
371 """Returns True if semaphore can not be acquired immediately."""
Guido van Rossumab3c8892014-01-25 16:51:57 -0800372 return self._value == 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700373
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200374 async def acquire(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700375 """Acquire a semaphore.
376
377 If the internal counter is larger than zero on entry,
378 decrement it by one and return True immediately. If it is
379 zero on entry, block, waiting until some other coroutine has
380 called release() to make it larger than 0, and then return
381 True.
382 """
Guido van Rossumd455a502015-09-29 11:54:45 -0700383 while self._value <= 0:
Yurii Karabas0ec34ca2020-11-24 20:08:54 +0200384 fut = self._get_loop().create_future()
Guido van Rossumd455a502015-09-29 11:54:45 -0700385 self._waiters.append(fut)
386 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200387 await fut
Guido van Rossumd455a502015-09-29 11:54:45 -0700388 except:
389 # See the similar code in Queue.get.
390 fut.cancel()
391 if self._value > 0 and not fut.cancelled():
392 self._wake_up_next()
393 raise
394 self._value -= 1
395 return True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700396
397 def release(self):
398 """Release a semaphore, incrementing the internal counter by one.
399 When it was zero on entry and another coroutine is waiting for it to
400 become larger than zero again, wake up that coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700401 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700402 self._value += 1
Guido van Rossumd455a502015-09-29 11:54:45 -0700403 self._wake_up_next()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700404
Guido van Rossum085869b2013-11-23 15:09:16 -0800405
406class BoundedSemaphore(Semaphore):
407 """A bounded semaphore implementation.
408
409 This raises ValueError in release() if it would increase the value
410 above the initial value.
411 """
412
Yurii Karabasb9127dd2020-11-25 13:50:44 +0200413 def __init__(self, value=1, *, loop=mixins._marker):
Guido van Rossum085869b2013-11-23 15:09:16 -0800414 self._bound_value = value
Yurii Karabasb9127dd2020-11-25 13:50:44 +0200415 super().__init__(value, loop=loop)
Guido van Rossum085869b2013-11-23 15:09:16 -0800416
417 def release(self):
418 if self._value >= self._bound_value:
419 raise ValueError('BoundedSemaphore released too many times')
420 super().release()