blob: 9e8529249eb73b5e23bab5cb79c07169f76da139 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Synchronization primitives."""
2
Guido van Rossuma58d1c32013-11-24 22:32:09 -08003__all__ = ['Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004
5import collections
6
7from . import events
8from . import futures
9from . import tasks
10
11
12class Lock:
13 """Primitive lock objects.
14
15 A primitive lock is a synchronization primitive that is not owned
16 by a particular coroutine when locked. A primitive lock is in one
17 of two states, 'locked' or 'unlocked'.
18
19 It is created in the unlocked state. It has two basic methods,
20 acquire() and release(). When the state is unlocked, acquire()
21 changes the state to locked and returns immediately. When the
22 state is locked, acquire() blocks until a call to release() in
23 another coroutine changes it to unlocked, then the acquire() call
24 resets it to locked and returns. The release() method should only
25 be called in the locked state; it changes the state to unlocked
26 and returns immediately. If an attempt is made to release an
27 unlocked lock, a RuntimeError will be raised.
28
29 When more than one coroutine is blocked in acquire() waiting for
30 the state to turn to unlocked, only one coroutine proceeds when a
31 release() call resets the state to unlocked; first coroutine which
32 is blocked in acquire() is being processed.
33
34 acquire() is a coroutine and should be called with 'yield from'.
35
36 Locks also support the context manager protocol. '(yield from lock)'
37 should be used as context manager expression.
38
39 Usage:
40
41 lock = Lock()
42 ...
43 yield from lock
44 try:
45 ...
46 finally:
47 lock.release()
48
49 Context manager usage:
50
51 lock = Lock()
52 ...
53 with (yield from lock):
54 ...
55
56 Lock objects can be tested for locking state:
57
58 if not lock.locked():
59 yield from lock
60 else:
61 # lock is acquired
62 ...
63
64 """
65
66 def __init__(self, *, loop=None):
67 self._waiters = collections.deque()
68 self._locked = False
69 if loop is not None:
70 self._loop = loop
71 else:
72 self._loop = events.get_event_loop()
73
74 def __repr__(self):
75 res = super().__repr__()
76 extra = 'locked' if self._locked else 'unlocked'
77 if self._waiters:
78 extra = '{},waiters:{}'.format(extra, len(self._waiters))
79 return '<{} [{}]>'.format(res[1:-1], extra)
80
81 def locked(self):
Victor Stinnerc37dd612013-12-02 14:31:16 +010082 """Return True if lock is acquired."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070083 return self._locked
84
85 @tasks.coroutine
86 def acquire(self):
87 """Acquire a lock.
88
89 This method blocks until the lock is unlocked, then sets it to
90 locked and returns True.
91 """
92 if not self._waiters and not self._locked:
93 self._locked = True
94 return True
95
96 fut = futures.Future(loop=self._loop)
97 self._waiters.append(fut)
98 try:
99 yield from fut
100 self._locked = True
101 return True
102 finally:
103 self._waiters.remove(fut)
104
105 def release(self):
106 """Release a lock.
107
108 When the lock is locked, reset it to unlocked, and return.
109 If any other coroutines are blocked waiting for the lock to become
110 unlocked, allow exactly one of them to proceed.
111
112 When invoked on an unlocked lock, a RuntimeError is raised.
113
114 There is no return value.
115 """
116 if self._locked:
117 self._locked = False
118 # Wake up the first waiter who isn't cancelled.
119 for fut in self._waiters:
120 if not fut.done():
121 fut.set_result(True)
122 break
123 else:
124 raise RuntimeError('Lock is not acquired.')
125
126 def __enter__(self):
127 if not self._locked:
128 raise RuntimeError(
129 '"yield from" should be used as context manager expression')
130 return True
131
132 def __exit__(self, *args):
133 self.release()
134
135 def __iter__(self):
136 yield from self.acquire()
137 return self
138
139
140class Event:
Guido van Rossum994bf432013-12-19 12:47:38 -0800141 """Asynchronous equivalent to threading.Event.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700142
143 Class implementing event objects. An event manages a flag that can be set
144 to true with the set() method and reset to false with the clear() method.
145 The wait() method blocks until the flag is true. The flag is initially
146 false.
147 """
148
149 def __init__(self, *, loop=None):
150 self._waiters = collections.deque()
151 self._value = False
152 if loop is not None:
153 self._loop = loop
154 else:
155 self._loop = events.get_event_loop()
156
157 def __repr__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700158 res = super().__repr__()
Guido van Rossumccea0842013-11-04 13:18:19 -0800159 extra = 'set' if self._value else 'unset'
160 if self._waiters:
161 extra = '{},waiters:{}'.format(extra, len(self._waiters))
162 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700163
164 def is_set(self):
Victor Stinnerc37dd612013-12-02 14:31:16 +0100165 """Return True if and only if the internal flag is true."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700166 return self._value
167
168 def set(self):
169 """Set the internal flag to true. All coroutines waiting for it to
170 become true are awakened. Coroutine that call wait() once the flag is
171 true will not block at all.
172 """
173 if not self._value:
174 self._value = True
175
176 for fut in self._waiters:
177 if not fut.done():
178 fut.set_result(True)
179
180 def clear(self):
181 """Reset the internal flag to false. Subsequently, coroutines calling
182 wait() will block until set() is called to set the internal flag
183 to true again."""
184 self._value = False
185
186 @tasks.coroutine
187 def wait(self):
188 """Block until the internal flag is true.
189
190 If the internal flag is true on entry, return True
191 immediately. Otherwise, block until another coroutine calls
192 set() to set the flag to true, then return True.
193 """
194 if self._value:
195 return True
196
197 fut = futures.Future(loop=self._loop)
198 self._waiters.append(fut)
199 try:
200 yield from fut
201 return True
202 finally:
203 self._waiters.remove(fut)
204
205
Guido van Rossumccea0842013-11-04 13:18:19 -0800206class Condition:
Guido van Rossum994bf432013-12-19 12:47:38 -0800207 """Asynchronous equivalent to threading.Condition.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700208
209 This class implements condition variable objects. A condition variable
210 allows one or more coroutines to wait until they are notified by another
211 coroutine.
Guido van Rossumccea0842013-11-04 13:18:19 -0800212
213 A new Lock object is created and used as the underlying lock.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700214 """
215
216 def __init__(self, *, loop=None):
Guido van Rossumccea0842013-11-04 13:18:19 -0800217 if loop is not None:
218 self._loop = loop
219 else:
220 self._loop = events.get_event_loop()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700221
Guido van Rossumccea0842013-11-04 13:18:19 -0800222 # Lock as an attribute as in threading.Condition.
223 lock = Lock(loop=self._loop)
224 self._lock = lock
225 # Export the lock's locked(), acquire() and release() methods.
226 self.locked = lock.locked
227 self.acquire = lock.acquire
228 self.release = lock.release
229
230 self._waiters = collections.deque()
231
232 def __repr__(self):
233 res = super().__repr__()
234 extra = 'locked' if self.locked() else 'unlocked'
235 if self._waiters:
236 extra = '{},waiters:{}'.format(extra, len(self._waiters))
237 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700238
239 @tasks.coroutine
240 def wait(self):
241 """Wait until notified.
242
243 If the calling coroutine has not acquired the lock when this
244 method is called, a RuntimeError is raised.
245
246 This method releases the underlying lock, and then blocks
247 until it is awakened by a notify() or notify_all() call for
248 the same condition variable in another coroutine. Once
249 awakened, it re-acquires the lock and returns True.
250 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800251 if not self.locked():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700252 raise RuntimeError('cannot wait on un-acquired lock')
253
254 keep_lock = True
255 self.release()
256 try:
257 fut = futures.Future(loop=self._loop)
Guido van Rossumccea0842013-11-04 13:18:19 -0800258 self._waiters.append(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700259 try:
260 yield from fut
261 return True
262 finally:
Guido van Rossumccea0842013-11-04 13:18:19 -0800263 self._waiters.remove(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700264
265 except GeneratorExit:
266 keep_lock = False # Prevent yield in finally clause.
267 raise
268 finally:
269 if keep_lock:
270 yield from self.acquire()
271
272 @tasks.coroutine
273 def wait_for(self, predicate):
274 """Wait until a predicate becomes true.
275
276 The predicate should be a callable which result will be
277 interpreted as a boolean value. The final predicate value is
278 the return value.
279 """
280 result = predicate()
281 while not result:
282 yield from self.wait()
283 result = predicate()
284 return result
285
286 def notify(self, n=1):
287 """By default, wake up one coroutine waiting on this condition, if any.
288 If the calling coroutine has not acquired the lock when this method
289 is called, a RuntimeError is raised.
290
291 This method wakes up at most n of the coroutines waiting for the
292 condition variable; it is a no-op if no coroutines are waiting.
293
294 Note: an awakened coroutine does not actually return from its
295 wait() call until it can reacquire the lock. Since notify() does
296 not release the lock, its caller should.
297 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800298 if not self.locked():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700299 raise RuntimeError('cannot notify on un-acquired lock')
300
301 idx = 0
Guido van Rossumccea0842013-11-04 13:18:19 -0800302 for fut in self._waiters:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700303 if idx >= n:
304 break
305
306 if not fut.done():
307 idx += 1
308 fut.set_result(False)
309
310 def notify_all(self):
311 """Wake up all threads waiting on this condition. This method acts
312 like notify(), but wakes up all waiting threads instead of one. If the
313 calling thread has not acquired the lock when this method is called,
314 a RuntimeError is raised.
315 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800316 self.notify(len(self._waiters))
317
318 def __enter__(self):
319 return self._lock.__enter__()
320
321 def __exit__(self, *args):
322 return self._lock.__exit__(*args)
323
324 def __iter__(self):
325 yield from self.acquire()
326 return self
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700327
328
329class Semaphore:
330 """A Semaphore implementation.
331
332 A semaphore manages an internal counter which is decremented by each
333 acquire() call and incremented by each release() call. The counter
334 can never go below zero; when acquire() finds that it is zero, it blocks,
335 waiting until some other thread calls release().
336
337 Semaphores also support the context manager protocol.
338
Guido van Rossum085869b2013-11-23 15:09:16 -0800339 The optional argument gives the initial value for the internal
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700340 counter; it defaults to 1. If the value given is less than 0,
341 ValueError is raised.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700342 """
343
Guido van Rossum085869b2013-11-23 15:09:16 -0800344 def __init__(self, value=1, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700345 if value < 0:
Guido van Rossum9c55a582013-11-21 11:07:45 -0800346 raise ValueError("Semaphore initial value must be >= 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700347 self._value = value
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700348 self._waiters = collections.deque()
Guido van Rossum9c55a582013-11-21 11:07:45 -0800349 self._locked = (value == 0)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700350 if loop is not None:
351 self._loop = loop
352 else:
353 self._loop = events.get_event_loop()
354
355 def __repr__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700356 res = super().__repr__()
Guido van Rossumccea0842013-11-04 13:18:19 -0800357 extra = 'locked' if self._locked else 'unlocked,value:{}'.format(
358 self._value)
359 if self._waiters:
360 extra = '{},waiters:{}'.format(extra, len(self._waiters))
361 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700362
363 def locked(self):
364 """Returns True if semaphore can not be acquired immediately."""
365 return self._locked
366
367 @tasks.coroutine
368 def acquire(self):
369 """Acquire a semaphore.
370
371 If the internal counter is larger than zero on entry,
372 decrement it by one and return True immediately. If it is
373 zero on entry, block, waiting until some other coroutine has
374 called release() to make it larger than 0, and then return
375 True.
376 """
377 if not self._waiters and self._value > 0:
378 self._value -= 1
379 if self._value == 0:
380 self._locked = True
381 return True
382
383 fut = futures.Future(loop=self._loop)
384 self._waiters.append(fut)
385 try:
386 yield from fut
387 self._value -= 1
388 if self._value == 0:
389 self._locked = True
390 return True
391 finally:
392 self._waiters.remove(fut)
393
394 def release(self):
395 """Release a semaphore, incrementing the internal counter by one.
396 When it was zero on entry and another coroutine is waiting for it to
397 become larger than zero again, wake up that coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700398 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700399 self._value += 1
400 self._locked = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700401 for waiter in self._waiters:
402 if not waiter.done():
403 waiter.set_result(True)
404 break
405
406 def __enter__(self):
407 # TODO: This is questionable. How do we know the user actually
408 # wrote "with (yield from sema)" instead of "with sema"?
409 return True
410
411 def __exit__(self, *args):
412 self.release()
413
414 def __iter__(self):
415 yield from self.acquire()
416 return self
Guido van Rossum085869b2013-11-23 15:09:16 -0800417
418
419class BoundedSemaphore(Semaphore):
420 """A bounded semaphore implementation.
421
422 This raises ValueError in release() if it would increase the value
423 above the initial value.
424 """
425
426 def __init__(self, value=1, *, loop=None):
427 self._bound_value = value
428 super().__init__(value, loop=loop)
429
430 def release(self):
431 if self._value >= self._bound_value:
432 raise ValueError('BoundedSemaphore released too many times')
433 super().release()