blob: 6f322c258cfe4f12d5c1ea4dfd384115924c8e1b [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Synchronization primitives."""
2
Yury Selivanov6370f342017-12-10 18:36:12 -05003__all__ = ('Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004
5import collections
6
Andrew Svetlov0baa72f2018-09-11 10:13:04 -07007from . import exceptions
Yurii Karabas0ec34ca2020-11-24 20:08:54 +02008from . import mixins
Guido van Rossumab3c8892014-01-25 16:51:57 -08009
10
Yury Selivanovd08c3632015-05-13 15:15:56 -040011class _ContextManagerMixin:
Andrew Svetlov5f841b52017-12-09 00:23:48 +020012 async def __aenter__(self):
13 await self.acquire()
Victor Stinner3f438a92017-11-28 14:43:52 +010014 # We have no use for the "as ..." clause in the with
15 # statement for locks.
16 return None
Yury Selivanovd08c3632015-05-13 15:15:56 -040017
Andrew Svetlov5f841b52017-12-09 00:23:48 +020018 async def __aexit__(self, exc_type, exc, tb):
Victor Stinner3f438a92017-11-28 14:43:52 +010019 self.release()
Yury Selivanovd08c3632015-05-13 15:15:56 -040020
21
Yurii Karabas0ec34ca2020-11-24 20:08:54 +020022class Lock(_ContextManagerMixin, mixins._LoopBoundedMixin):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023 """Primitive lock objects.
24
25 A primitive lock is a synchronization primitive that is not owned
26 by a particular coroutine when locked. A primitive lock is in one
27 of two states, 'locked' or 'unlocked'.
28
29 It is created in the unlocked state. It has two basic methods,
30 acquire() and release(). When the state is unlocked, acquire()
31 changes the state to locked and returns immediately. When the
32 state is locked, acquire() blocks until a call to release() in
33 another coroutine changes it to unlocked, then the acquire() call
34 resets it to locked and returns. The release() method should only
35 be called in the locked state; it changes the state to unlocked
36 and returns immediately. If an attempt is made to release an
37 unlocked lock, a RuntimeError will be raised.
38
39 When more than one coroutine is blocked in acquire() waiting for
40 the state to turn to unlocked, only one coroutine proceeds when a
41 release() call resets the state to unlocked; first coroutine which
42 is blocked in acquire() is being processed.
43
Andrew Svetlov88743422017-12-11 17:35:49 +020044 acquire() is a coroutine and should be called with 'await'.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070045
Andrew Svetlov88743422017-12-11 17:35:49 +020046 Locks also support the asynchronous context management protocol.
47 'async with lock' statement should be used.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070048
49 Usage:
50
51 lock = Lock()
52 ...
Andrew Svetlov88743422017-12-11 17:35:49 +020053 await lock.acquire()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070054 try:
55 ...
56 finally:
57 lock.release()
58
59 Context manager usage:
60
61 lock = Lock()
62 ...
Andrew Svetlov88743422017-12-11 17:35:49 +020063 async with lock:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070064 ...
65
66 Lock objects can be tested for locking state:
67
68 if not lock.locked():
Andrew Svetlov88743422017-12-11 17:35:49 +020069 await lock.acquire()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070070 else:
71 # lock is acquired
72 ...
73
74 """
75
Yurii Karabas0ec34ca2020-11-24 20:08:54 +020076 def __init__(self):
Zackery Spytz9aa78562019-06-05 03:33:27 -060077 self._waiters = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070078 self._locked = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070079
80 def __repr__(self):
81 res = super().__repr__()
82 extra = 'locked' if self._locked else 'unlocked'
83 if self._waiters:
Yury Selivanov6370f342017-12-10 18:36:12 -050084 extra = f'{extra}, waiters:{len(self._waiters)}'
85 return f'<{res[1:-1]} [{extra}]>'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086
87 def locked(self):
Victor Stinnerc37dd612013-12-02 14:31:16 +010088 """Return True if lock is acquired."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070089 return self._locked
90
Andrew Svetlov5f841b52017-12-09 00:23:48 +020091 async def acquire(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070092 """Acquire a lock.
93
94 This method blocks until the lock is unlocked, then sets it to
95 locked and returns True.
96 """
Zackery Spytz9aa78562019-06-05 03:33:27 -060097 if (not self._locked and (self._waiters is None or
98 all(w.cancelled() for w in self._waiters))):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070099 self._locked = True
100 return True
101
Zackery Spytz9aa78562019-06-05 03:33:27 -0600102 if self._waiters is None:
103 self._waiters = collections.deque()
Yurii Karabas0ec34ca2020-11-24 20:08:54 +0200104 fut = self._get_loop().create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700105 self._waiters.append(fut)
Bar Harel2f79c012018-02-03 00:04:00 +0200106
107 # Finally block should be called before the CancelledError
108 # handling as we don't want CancelledError to call
109 # _wake_up_first() and attempt to wake up itself.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700110 try:
Bar Harel2f79c012018-02-03 00:04:00 +0200111 try:
112 await fut
113 finally:
114 self._waiters.remove(fut)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700115 except exceptions.CancelledError:
Mathieu Sornay894a6542017-06-09 22:17:40 +0200116 if not self._locked:
117 self._wake_up_first()
118 raise
Bar Harel2f79c012018-02-03 00:04:00 +0200119
120 self._locked = True
121 return True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700122
123 def release(self):
124 """Release a lock.
125
126 When the lock is locked, reset it to unlocked, and return.
127 If any other coroutines are blocked waiting for the lock to become
128 unlocked, allow exactly one of them to proceed.
129
130 When invoked on an unlocked lock, a RuntimeError is raised.
131
132 There is no return value.
133 """
134 if self._locked:
135 self._locked = False
Mathieu Sornay894a6542017-06-09 22:17:40 +0200136 self._wake_up_first()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700137 else:
138 raise RuntimeError('Lock is not acquired.')
139
Mathieu Sornay894a6542017-06-09 22:17:40 +0200140 def _wake_up_first(self):
Bar Harel2f79c012018-02-03 00:04:00 +0200141 """Wake up the first waiter if it isn't done."""
Zackery Spytz9aa78562019-06-05 03:33:27 -0600142 if not self._waiters:
143 return
Bar Harel2f79c012018-02-03 00:04:00 +0200144 try:
145 fut = next(iter(self._waiters))
146 except StopIteration:
147 return
148
149 # .done() necessarily means that a waiter will wake up later on and
150 # either take the lock, or, if it was cancelled and lock wasn't
151 # taken already, will hit this again and wake up a new waiter.
152 if not fut.done():
153 fut.set_result(True)
Mathieu Sornay894a6542017-06-09 22:17:40 +0200154
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700155
Yurii Karabas0ec34ca2020-11-24 20:08:54 +0200156class Event(mixins._LoopBoundedMixin):
Guido van Rossum994bf432013-12-19 12:47:38 -0800157 """Asynchronous equivalent to threading.Event.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700158
159 Class implementing event objects. An event manages a flag that can be set
160 to true with the set() method and reset to false with the clear() method.
161 The wait() method blocks until the flag is true. The flag is initially
162 false.
163 """
164
Yurii Karabas0ec34ca2020-11-24 20:08:54 +0200165 def __init__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700166 self._waiters = collections.deque()
167 self._value = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700168
169 def __repr__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700170 res = super().__repr__()
Guido van Rossumccea0842013-11-04 13:18:19 -0800171 extra = 'set' if self._value else 'unset'
172 if self._waiters:
Yury Selivanov6370f342017-12-10 18:36:12 -0500173 extra = f'{extra}, waiters:{len(self._waiters)}'
174 return f'<{res[1:-1]} [{extra}]>'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700175
176 def is_set(self):
Victor Stinnerc37dd612013-12-02 14:31:16 +0100177 """Return True if and only if the internal flag is true."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700178 return self._value
179
180 def set(self):
181 """Set the internal flag to true. All coroutines waiting for it to
182 become true are awakened. Coroutine that call wait() once the flag is
183 true will not block at all.
184 """
185 if not self._value:
186 self._value = True
187
188 for fut in self._waiters:
189 if not fut.done():
190 fut.set_result(True)
191
192 def clear(self):
193 """Reset the internal flag to false. Subsequently, coroutines calling
194 wait() will block until set() is called to set the internal flag
195 to true again."""
196 self._value = False
197
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200198 async def wait(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700199 """Block until the internal flag is true.
200
201 If the internal flag is true on entry, return True
202 immediately. Otherwise, block until another coroutine calls
203 set() to set the flag to true, then return True.
204 """
205 if self._value:
206 return True
207
Yurii Karabas0ec34ca2020-11-24 20:08:54 +0200208 fut = self._get_loop().create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700209 self._waiters.append(fut)
210 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200211 await fut
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700212 return True
213 finally:
214 self._waiters.remove(fut)
215
216
Yurii Karabas0ec34ca2020-11-24 20:08:54 +0200217class Condition(_ContextManagerMixin, mixins._LoopBoundedMixin):
Guido van Rossum994bf432013-12-19 12:47:38 -0800218 """Asynchronous equivalent to threading.Condition.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700219
220 This class implements condition variable objects. A condition variable
221 allows one or more coroutines to wait until they are notified by another
222 coroutine.
Guido van Rossumccea0842013-11-04 13:18:19 -0800223
224 A new Lock object is created and used as the underlying lock.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700225 """
226
Yurii Karabas0ec34ca2020-11-24 20:08:54 +0200227 def __init__(self, lock=None):
Andrew Svetlovf21fcd02014-07-26 17:54:34 +0300228 if lock is None:
Yurii Karabas0ec34ca2020-11-24 20:08:54 +0200229 lock = Lock()
230 elif lock._loop is not self._get_loop():
Andrew Svetlovf21fcd02014-07-26 17:54:34 +0300231 raise ValueError("loop argument must agree with lock")
232
Guido van Rossumccea0842013-11-04 13:18:19 -0800233 self._lock = lock
234 # Export the lock's locked(), acquire() and release() methods.
235 self.locked = lock.locked
236 self.acquire = lock.acquire
237 self.release = lock.release
238
239 self._waiters = collections.deque()
240
241 def __repr__(self):
242 res = super().__repr__()
243 extra = 'locked' if self.locked() else 'unlocked'
244 if self._waiters:
Yury Selivanov6370f342017-12-10 18:36:12 -0500245 extra = f'{extra}, waiters:{len(self._waiters)}'
246 return f'<{res[1:-1]} [{extra}]>'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700247
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200248 async def wait(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700249 """Wait until notified.
250
251 If the calling coroutine has not acquired the lock when this
252 method is called, a RuntimeError is raised.
253
254 This method releases the underlying lock, and then blocks
255 until it is awakened by a notify() or notify_all() call for
256 the same condition variable in another coroutine. Once
257 awakened, it re-acquires the lock and returns True.
258 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800259 if not self.locked():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700260 raise RuntimeError('cannot wait on un-acquired lock')
261
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700262 self.release()
263 try:
Yurii Karabas0ec34ca2020-11-24 20:08:54 +0200264 fut = self._get_loop().create_future()
Guido van Rossumccea0842013-11-04 13:18:19 -0800265 self._waiters.append(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700266 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200267 await fut
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700268 return True
269 finally:
Guido van Rossumccea0842013-11-04 13:18:19 -0800270 self._waiters.remove(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700271
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700272 finally:
Yury Selivanovc92bf832016-06-11 12:00:07 -0400273 # Must reacquire lock even if wait is cancelled
Bar Harel57465102018-02-14 11:18:11 +0200274 cancelled = False
Yury Selivanovc92bf832016-06-11 12:00:07 -0400275 while True:
276 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200277 await self.acquire()
Yury Selivanovc92bf832016-06-11 12:00:07 -0400278 break
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700279 except exceptions.CancelledError:
Bar Harel57465102018-02-14 11:18:11 +0200280 cancelled = True
281
282 if cancelled:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700283 raise exceptions.CancelledError
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700284
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200285 async def wait_for(self, predicate):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700286 """Wait until a predicate becomes true.
287
288 The predicate should be a callable which result will be
289 interpreted as a boolean value. The final predicate value is
290 the return value.
291 """
292 result = predicate()
293 while not result:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200294 await self.wait()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700295 result = predicate()
296 return result
297
298 def notify(self, n=1):
299 """By default, wake up one coroutine waiting on this condition, if any.
300 If the calling coroutine has not acquired the lock when this method
301 is called, a RuntimeError is raised.
302
303 This method wakes up at most n of the coroutines waiting for the
304 condition variable; it is a no-op if no coroutines are waiting.
305
306 Note: an awakened coroutine does not actually return from its
307 wait() call until it can reacquire the lock. Since notify() does
308 not release the lock, its caller should.
309 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800310 if not self.locked():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700311 raise RuntimeError('cannot notify on un-acquired lock')
312
313 idx = 0
Guido van Rossumccea0842013-11-04 13:18:19 -0800314 for fut in self._waiters:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700315 if idx >= n:
316 break
317
318 if not fut.done():
319 idx += 1
320 fut.set_result(False)
321
322 def notify_all(self):
323 """Wake up all threads waiting on this condition. This method acts
324 like notify(), but wakes up all waiting threads instead of one. If the
325 calling thread has not acquired the lock when this method is called,
326 a RuntimeError is raised.
327 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800328 self.notify(len(self._waiters))
329
Guido van Rossumccea0842013-11-04 13:18:19 -0800330
Yurii Karabas0ec34ca2020-11-24 20:08:54 +0200331class Semaphore(_ContextManagerMixin, mixins._LoopBoundedMixin):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700332 """A Semaphore implementation.
333
334 A semaphore manages an internal counter which is decremented by each
335 acquire() call and incremented by each release() call. The counter
336 can never go below zero; when acquire() finds that it is zero, it blocks,
337 waiting until some other thread calls release().
338
Serhiy Storchaka14867992014-09-10 23:43:41 +0300339 Semaphores also support the context management protocol.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700340
Guido van Rossum085869b2013-11-23 15:09:16 -0800341 The optional argument gives the initial value for the internal
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700342 counter; it defaults to 1. If the value given is less than 0,
343 ValueError is raised.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700344 """
345
Yurii Karabas0ec34ca2020-11-24 20:08:54 +0200346 def __init__(self, value=1):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700347 if value < 0:
Guido van Rossum9c55a582013-11-21 11:07:45 -0800348 raise ValueError("Semaphore initial value must be >= 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700349 self._value = value
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700350 self._waiters = collections.deque()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700351
352 def __repr__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700353 res = super().__repr__()
Yury Selivanov6370f342017-12-10 18:36:12 -0500354 extra = 'locked' if self.locked() else f'unlocked, value:{self._value}'
Guido van Rossumccea0842013-11-04 13:18:19 -0800355 if self._waiters:
Yury Selivanov6370f342017-12-10 18:36:12 -0500356 extra = f'{extra}, waiters:{len(self._waiters)}'
357 return f'<{res[1:-1]} [{extra}]>'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700358
Guido van Rossumd455a502015-09-29 11:54:45 -0700359 def _wake_up_next(self):
360 while self._waiters:
361 waiter = self._waiters.popleft()
362 if not waiter.done():
363 waiter.set_result(None)
364 return
365
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700366 def locked(self):
367 """Returns True if semaphore can not be acquired immediately."""
Guido van Rossumab3c8892014-01-25 16:51:57 -0800368 return self._value == 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700369
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200370 async def acquire(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700371 """Acquire a semaphore.
372
373 If the internal counter is larger than zero on entry,
374 decrement it by one and return True immediately. If it is
375 zero on entry, block, waiting until some other coroutine has
376 called release() to make it larger than 0, and then return
377 True.
378 """
Guido van Rossumd455a502015-09-29 11:54:45 -0700379 while self._value <= 0:
Yurii Karabas0ec34ca2020-11-24 20:08:54 +0200380 fut = self._get_loop().create_future()
Guido van Rossumd455a502015-09-29 11:54:45 -0700381 self._waiters.append(fut)
382 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200383 await fut
Guido van Rossumd455a502015-09-29 11:54:45 -0700384 except:
385 # See the similar code in Queue.get.
386 fut.cancel()
387 if self._value > 0 and not fut.cancelled():
388 self._wake_up_next()
389 raise
390 self._value -= 1
391 return True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700392
393 def release(self):
394 """Release a semaphore, incrementing the internal counter by one.
395 When it was zero on entry and another coroutine is waiting for it to
396 become larger than zero again, wake up that coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700397 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700398 self._value += 1
Guido van Rossumd455a502015-09-29 11:54:45 -0700399 self._wake_up_next()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700400
Guido van Rossum085869b2013-11-23 15:09:16 -0800401
402class BoundedSemaphore(Semaphore):
403 """A bounded semaphore implementation.
404
405 This raises ValueError in release() if it would increase the value
406 above the initial value.
407 """
408
Yurii Karabas0ec34ca2020-11-24 20:08:54 +0200409 def __init__(self, value=1):
Guido van Rossum085869b2013-11-23 15:09:16 -0800410 self._bound_value = value
Yurii Karabas0ec34ca2020-11-24 20:08:54 +0200411 super().__init__(value)
Guido van Rossum085869b2013-11-23 15:09:16 -0800412
413 def release(self):
414 if self._value >= self._bound_value:
415 raise ValueError('BoundedSemaphore released too many times')
416 super().release()