blob: f1ce7324785ba983b8ba12f3283a70757ad0cc86 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Synchronization primitives."""
2
Yury Selivanov6370f342017-12-10 18:36:12 -05003__all__ = ('Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004
5import collections
Andrew Svetlov28d8d142017-12-09 20:00:05 +02006import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007
8from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -07009from . import exceptions
Guido van Rossumab3c8892014-01-25 16:51:57 -080010
11
Yury Selivanovd08c3632015-05-13 15:15:56 -040012class _ContextManagerMixin:
Andrew Svetlov5f841b52017-12-09 00:23:48 +020013 async def __aenter__(self):
14 await self.acquire()
Victor Stinner3f438a92017-11-28 14:43:52 +010015 # We have no use for the "as ..." clause in the with
16 # statement for locks.
17 return None
Yury Selivanovd08c3632015-05-13 15:15:56 -040018
Andrew Svetlov5f841b52017-12-09 00:23:48 +020019 async def __aexit__(self, exc_type, exc, tb):
Victor Stinner3f438a92017-11-28 14:43:52 +010020 self.release()
Yury Selivanovd08c3632015-05-13 15:15:56 -040021
22
23class Lock(_ContextManagerMixin):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070024 """Primitive lock objects.
25
26 A primitive lock is a synchronization primitive that is not owned
27 by a particular coroutine when locked. A primitive lock is in one
28 of two states, 'locked' or 'unlocked'.
29
30 It is created in the unlocked state. It has two basic methods,
31 acquire() and release(). When the state is unlocked, acquire()
32 changes the state to locked and returns immediately. When the
33 state is locked, acquire() blocks until a call to release() in
34 another coroutine changes it to unlocked, then the acquire() call
35 resets it to locked and returns. The release() method should only
36 be called in the locked state; it changes the state to unlocked
37 and returns immediately. If an attempt is made to release an
38 unlocked lock, a RuntimeError will be raised.
39
40 When more than one coroutine is blocked in acquire() waiting for
41 the state to turn to unlocked, only one coroutine proceeds when a
42 release() call resets the state to unlocked; first coroutine which
43 is blocked in acquire() is being processed.
44
Andrew Svetlov88743422017-12-11 17:35:49 +020045 acquire() is a coroutine and should be called with 'await'.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070046
Andrew Svetlov88743422017-12-11 17:35:49 +020047 Locks also support the asynchronous context management protocol.
48 'async with lock' statement should be used.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070049
50 Usage:
51
52 lock = Lock()
53 ...
Andrew Svetlov88743422017-12-11 17:35:49 +020054 await lock.acquire()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070055 try:
56 ...
57 finally:
58 lock.release()
59
60 Context manager usage:
61
62 lock = Lock()
63 ...
Andrew Svetlov88743422017-12-11 17:35:49 +020064 async with lock:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070065 ...
66
67 Lock objects can be tested for locking state:
68
69 if not lock.locked():
Andrew Svetlov88743422017-12-11 17:35:49 +020070 await lock.acquire()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070071 else:
72 # lock is acquired
73 ...
74
75 """
76
77 def __init__(self, *, loop=None):
Zackery Spytz9aa78562019-06-05 03:33:27 -060078 self._waiters = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070079 self._locked = False
Emmanuel Arias537877d2019-09-10 07:55:07 -030080 if loop is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070081 self._loop = events.get_event_loop()
Emmanuel Arias537877d2019-09-10 07:55:07 -030082 else:
83 self._loop = loop
84 warnings.warn("The loop argument is deprecated since Python 3.8, "
85 "and scheduled for removal in Python 3.10.",
86 DeprecationWarning, stacklevel=2)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070087
88 def __repr__(self):
89 res = super().__repr__()
90 extra = 'locked' if self._locked else 'unlocked'
91 if self._waiters:
Yury Selivanov6370f342017-12-10 18:36:12 -050092 extra = f'{extra}, waiters:{len(self._waiters)}'
93 return f'<{res[1:-1]} [{extra}]>'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070094
95 def locked(self):
Victor Stinnerc37dd612013-12-02 14:31:16 +010096 """Return True if lock is acquired."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070097 return self._locked
98
Andrew Svetlov5f841b52017-12-09 00:23:48 +020099 async def acquire(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700100 """Acquire a lock.
101
102 This method blocks until the lock is unlocked, then sets it to
103 locked and returns True.
104 """
Zackery Spytz9aa78562019-06-05 03:33:27 -0600105 if (not self._locked and (self._waiters is None or
106 all(w.cancelled() for w in self._waiters))):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700107 self._locked = True
108 return True
109
Zackery Spytz9aa78562019-06-05 03:33:27 -0600110 if self._waiters is None:
111 self._waiters = collections.deque()
Yury Selivanov7661db62016-05-16 15:38:39 -0400112 fut = self._loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700113 self._waiters.append(fut)
Bar Harel2f79c012018-02-03 00:04:00 +0200114
115 # Finally block should be called before the CancelledError
116 # handling as we don't want CancelledError to call
117 # _wake_up_first() and attempt to wake up itself.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700118 try:
Bar Harel2f79c012018-02-03 00:04:00 +0200119 try:
120 await fut
121 finally:
122 self._waiters.remove(fut)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700123 except exceptions.CancelledError:
Mathieu Sornay894a6542017-06-09 22:17:40 +0200124 if not self._locked:
125 self._wake_up_first()
126 raise
Bar Harel2f79c012018-02-03 00:04:00 +0200127
128 self._locked = True
129 return True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700130
131 def release(self):
132 """Release a lock.
133
134 When the lock is locked, reset it to unlocked, and return.
135 If any other coroutines are blocked waiting for the lock to become
136 unlocked, allow exactly one of them to proceed.
137
138 When invoked on an unlocked lock, a RuntimeError is raised.
139
140 There is no return value.
141 """
142 if self._locked:
143 self._locked = False
Mathieu Sornay894a6542017-06-09 22:17:40 +0200144 self._wake_up_first()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700145 else:
146 raise RuntimeError('Lock is not acquired.')
147
Mathieu Sornay894a6542017-06-09 22:17:40 +0200148 def _wake_up_first(self):
Bar Harel2f79c012018-02-03 00:04:00 +0200149 """Wake up the first waiter if it isn't done."""
Zackery Spytz9aa78562019-06-05 03:33:27 -0600150 if not self._waiters:
151 return
Bar Harel2f79c012018-02-03 00:04:00 +0200152 try:
153 fut = next(iter(self._waiters))
154 except StopIteration:
155 return
156
157 # .done() necessarily means that a waiter will wake up later on and
158 # either take the lock, or, if it was cancelled and lock wasn't
159 # taken already, will hit this again and wake up a new waiter.
160 if not fut.done():
161 fut.set_result(True)
Mathieu Sornay894a6542017-06-09 22:17:40 +0200162
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700163
164class Event:
Guido van Rossum994bf432013-12-19 12:47:38 -0800165 """Asynchronous equivalent to threading.Event.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700166
167 Class implementing event objects. An event manages a flag that can be set
168 to true with the set() method and reset to false with the clear() method.
169 The wait() method blocks until the flag is true. The flag is initially
170 false.
171 """
172
173 def __init__(self, *, loop=None):
174 self._waiters = collections.deque()
175 self._value = False
Emmanuel Arias537877d2019-09-10 07:55:07 -0300176 if loop is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700177 self._loop = events.get_event_loop()
Emmanuel Arias537877d2019-09-10 07:55:07 -0300178 else:
179 self._loop = loop
180 warnings.warn("The loop argument is deprecated since Python 3.8, "
181 "and scheduled for removal in Python 3.10.",
182 DeprecationWarning, stacklevel=2)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700183
184 def __repr__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700185 res = super().__repr__()
Guido van Rossumccea0842013-11-04 13:18:19 -0800186 extra = 'set' if self._value else 'unset'
187 if self._waiters:
Yury Selivanov6370f342017-12-10 18:36:12 -0500188 extra = f'{extra}, waiters:{len(self._waiters)}'
189 return f'<{res[1:-1]} [{extra}]>'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700190
191 def is_set(self):
Victor Stinnerc37dd612013-12-02 14:31:16 +0100192 """Return True if and only if the internal flag is true."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700193 return self._value
194
195 def set(self):
196 """Set the internal flag to true. All coroutines waiting for it to
197 become true are awakened. Coroutine that call wait() once the flag is
198 true will not block at all.
199 """
200 if not self._value:
201 self._value = True
202
203 for fut in self._waiters:
204 if not fut.done():
205 fut.set_result(True)
206
207 def clear(self):
208 """Reset the internal flag to false. Subsequently, coroutines calling
209 wait() will block until set() is called to set the internal flag
210 to true again."""
211 self._value = False
212
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200213 async def wait(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700214 """Block until the internal flag is true.
215
216 If the internal flag is true on entry, return True
217 immediately. Otherwise, block until another coroutine calls
218 set() to set the flag to true, then return True.
219 """
220 if self._value:
221 return True
222
Yury Selivanov7661db62016-05-16 15:38:39 -0400223 fut = self._loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700224 self._waiters.append(fut)
225 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200226 await fut
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700227 return True
228 finally:
229 self._waiters.remove(fut)
230
231
Yury Selivanovd08c3632015-05-13 15:15:56 -0400232class Condition(_ContextManagerMixin):
Guido van Rossum994bf432013-12-19 12:47:38 -0800233 """Asynchronous equivalent to threading.Condition.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700234
235 This class implements condition variable objects. A condition variable
236 allows one or more coroutines to wait until they are notified by another
237 coroutine.
Guido van Rossumccea0842013-11-04 13:18:19 -0800238
239 A new Lock object is created and used as the underlying lock.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700240 """
241
Andrew Svetlovf21fcd02014-07-26 17:54:34 +0300242 def __init__(self, lock=None, *, loop=None):
Emmanuel Arias537877d2019-09-10 07:55:07 -0300243 if loop is None:
Guido van Rossumccea0842013-11-04 13:18:19 -0800244 self._loop = events.get_event_loop()
Emmanuel Arias537877d2019-09-10 07:55:07 -0300245 else:
246 self._loop = loop
247 warnings.warn("The loop argument is deprecated since Python 3.8, "
248 "and scheduled for removal in Python 3.10.",
249 DeprecationWarning, stacklevel=2)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700250
Andrew Svetlovf21fcd02014-07-26 17:54:34 +0300251 if lock is None:
Andrew Svetlov7264e922019-09-11 11:20:24 +0300252 lock = Lock(loop=loop)
Andrew Svetlovf21fcd02014-07-26 17:54:34 +0300253 elif lock._loop is not self._loop:
254 raise ValueError("loop argument must agree with lock")
255
Guido van Rossumccea0842013-11-04 13:18:19 -0800256 self._lock = lock
257 # Export the lock's locked(), acquire() and release() methods.
258 self.locked = lock.locked
259 self.acquire = lock.acquire
260 self.release = lock.release
261
262 self._waiters = collections.deque()
263
264 def __repr__(self):
265 res = super().__repr__()
266 extra = 'locked' if self.locked() else 'unlocked'
267 if self._waiters:
Yury Selivanov6370f342017-12-10 18:36:12 -0500268 extra = f'{extra}, waiters:{len(self._waiters)}'
269 return f'<{res[1:-1]} [{extra}]>'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700270
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200271 async def wait(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700272 """Wait until notified.
273
274 If the calling coroutine has not acquired the lock when this
275 method is called, a RuntimeError is raised.
276
277 This method releases the underlying lock, and then blocks
278 until it is awakened by a notify() or notify_all() call for
279 the same condition variable in another coroutine. Once
280 awakened, it re-acquires the lock and returns True.
281 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800282 if not self.locked():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700283 raise RuntimeError('cannot wait on un-acquired lock')
284
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700285 self.release()
286 try:
Yury Selivanov7661db62016-05-16 15:38:39 -0400287 fut = self._loop.create_future()
Guido van Rossumccea0842013-11-04 13:18:19 -0800288 self._waiters.append(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700289 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200290 await fut
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700291 return True
292 finally:
Guido van Rossumccea0842013-11-04 13:18:19 -0800293 self._waiters.remove(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700294
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700295 finally:
Yury Selivanovc92bf832016-06-11 12:00:07 -0400296 # Must reacquire lock even if wait is cancelled
Bar Harel57465102018-02-14 11:18:11 +0200297 cancelled = False
Yury Selivanovc92bf832016-06-11 12:00:07 -0400298 while True:
299 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200300 await self.acquire()
Yury Selivanovc92bf832016-06-11 12:00:07 -0400301 break
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700302 except exceptions.CancelledError:
Bar Harel57465102018-02-14 11:18:11 +0200303 cancelled = True
304
305 if cancelled:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700306 raise exceptions.CancelledError
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700307
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200308 async def wait_for(self, predicate):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700309 """Wait until a predicate becomes true.
310
311 The predicate should be a callable which result will be
312 interpreted as a boolean value. The final predicate value is
313 the return value.
314 """
315 result = predicate()
316 while not result:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200317 await self.wait()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700318 result = predicate()
319 return result
320
321 def notify(self, n=1):
322 """By default, wake up one coroutine waiting on this condition, if any.
323 If the calling coroutine has not acquired the lock when this method
324 is called, a RuntimeError is raised.
325
326 This method wakes up at most n of the coroutines waiting for the
327 condition variable; it is a no-op if no coroutines are waiting.
328
329 Note: an awakened coroutine does not actually return from its
330 wait() call until it can reacquire the lock. Since notify() does
331 not release the lock, its caller should.
332 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800333 if not self.locked():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700334 raise RuntimeError('cannot notify on un-acquired lock')
335
336 idx = 0
Guido van Rossumccea0842013-11-04 13:18:19 -0800337 for fut in self._waiters:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700338 if idx >= n:
339 break
340
341 if not fut.done():
342 idx += 1
343 fut.set_result(False)
344
345 def notify_all(self):
346 """Wake up all threads waiting on this condition. This method acts
347 like notify(), but wakes up all waiting threads instead of one. If the
348 calling thread has not acquired the lock when this method is called,
349 a RuntimeError is raised.
350 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800351 self.notify(len(self._waiters))
352
Guido van Rossumccea0842013-11-04 13:18:19 -0800353
Yury Selivanovd08c3632015-05-13 15:15:56 -0400354class Semaphore(_ContextManagerMixin):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700355 """A Semaphore implementation.
356
357 A semaphore manages an internal counter which is decremented by each
358 acquire() call and incremented by each release() call. The counter
359 can never go below zero; when acquire() finds that it is zero, it blocks,
360 waiting until some other thread calls release().
361
Serhiy Storchaka14867992014-09-10 23:43:41 +0300362 Semaphores also support the context management protocol.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700363
Guido van Rossum085869b2013-11-23 15:09:16 -0800364 The optional argument gives the initial value for the internal
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700365 counter; it defaults to 1. If the value given is less than 0,
366 ValueError is raised.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700367 """
368
Guido van Rossum085869b2013-11-23 15:09:16 -0800369 def __init__(self, value=1, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700370 if value < 0:
Guido van Rossum9c55a582013-11-21 11:07:45 -0800371 raise ValueError("Semaphore initial value must be >= 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700372 self._value = value
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700373 self._waiters = collections.deque()
Emmanuel Arias537877d2019-09-10 07:55:07 -0300374 if loop is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700375 self._loop = events.get_event_loop()
Emmanuel Arias537877d2019-09-10 07:55:07 -0300376 else:
377 self._loop = loop
378 warnings.warn("The loop argument is deprecated since Python 3.8, "
379 "and scheduled for removal in Python 3.10.",
380 DeprecationWarning, stacklevel=2)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700381
382 def __repr__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700383 res = super().__repr__()
Yury Selivanov6370f342017-12-10 18:36:12 -0500384 extra = 'locked' if self.locked() else f'unlocked, value:{self._value}'
Guido van Rossumccea0842013-11-04 13:18:19 -0800385 if self._waiters:
Yury Selivanov6370f342017-12-10 18:36:12 -0500386 extra = f'{extra}, waiters:{len(self._waiters)}'
387 return f'<{res[1:-1]} [{extra}]>'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700388
Guido van Rossumd455a502015-09-29 11:54:45 -0700389 def _wake_up_next(self):
390 while self._waiters:
391 waiter = self._waiters.popleft()
392 if not waiter.done():
393 waiter.set_result(None)
394 return
395
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700396 def locked(self):
397 """Returns True if semaphore can not be acquired immediately."""
Guido van Rossumab3c8892014-01-25 16:51:57 -0800398 return self._value == 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700399
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200400 async def acquire(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700401 """Acquire a semaphore.
402
403 If the internal counter is larger than zero on entry,
404 decrement it by one and return True immediately. If it is
405 zero on entry, block, waiting until some other coroutine has
406 called release() to make it larger than 0, and then return
407 True.
408 """
Guido van Rossumd455a502015-09-29 11:54:45 -0700409 while self._value <= 0:
Yury Selivanov7661db62016-05-16 15:38:39 -0400410 fut = self._loop.create_future()
Guido van Rossumd455a502015-09-29 11:54:45 -0700411 self._waiters.append(fut)
412 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200413 await fut
Guido van Rossumd455a502015-09-29 11:54:45 -0700414 except:
415 # See the similar code in Queue.get.
416 fut.cancel()
417 if self._value > 0 and not fut.cancelled():
418 self._wake_up_next()
419 raise
420 self._value -= 1
421 return True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700422
423 def release(self):
424 """Release a semaphore, incrementing the internal counter by one.
425 When it was zero on entry and another coroutine is waiting for it to
426 become larger than zero again, wake up that coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700427 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700428 self._value += 1
Guido van Rossumd455a502015-09-29 11:54:45 -0700429 self._wake_up_next()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700430
Guido van Rossum085869b2013-11-23 15:09:16 -0800431
432class BoundedSemaphore(Semaphore):
433 """A bounded semaphore implementation.
434
435 This raises ValueError in release() if it would increase the value
436 above the initial value.
437 """
438
439 def __init__(self, value=1, *, loop=None):
Emmanuel Arias537877d2019-09-10 07:55:07 -0300440 if loop:
441 warnings.warn("The loop argument is deprecated since Python 3.8, "
442 "and scheduled for removal in Python 3.10.",
443 DeprecationWarning, stacklevel=2)
444
Guido van Rossum085869b2013-11-23 15:09:16 -0800445 self._bound_value = value
446 super().__init__(value, loop=loop)
447
448 def release(self):
449 if self._value >= self._bound_value:
450 raise ValueError('BoundedSemaphore released too many times')
451 super().release()