blob: ac851e5cc4b4c5022f60c421a7df0549b58684c4 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Synchronization primitives."""
2
3__all__ = ['Lock', 'Event', 'Condition', 'Semaphore']
4
5import collections
6
7from . import events
8from . import futures
9from . import tasks
10
11
12class Lock:
13 """Primitive lock objects.
14
15 A primitive lock is a synchronization primitive that is not owned
16 by a particular coroutine when locked. A primitive lock is in one
17 of two states, 'locked' or 'unlocked'.
18
19 It is created in the unlocked state. It has two basic methods,
20 acquire() and release(). When the state is unlocked, acquire()
21 changes the state to locked and returns immediately. When the
22 state is locked, acquire() blocks until a call to release() in
23 another coroutine changes it to unlocked, then the acquire() call
24 resets it to locked and returns. The release() method should only
25 be called in the locked state; it changes the state to unlocked
26 and returns immediately. If an attempt is made to release an
27 unlocked lock, a RuntimeError will be raised.
28
29 When more than one coroutine is blocked in acquire() waiting for
30 the state to turn to unlocked, only one coroutine proceeds when a
31 release() call resets the state to unlocked; first coroutine which
32 is blocked in acquire() is being processed.
33
34 acquire() is a coroutine and should be called with 'yield from'.
35
36 Locks also support the context manager protocol. '(yield from lock)'
37 should be used as context manager expression.
38
39 Usage:
40
41 lock = Lock()
42 ...
43 yield from lock
44 try:
45 ...
46 finally:
47 lock.release()
48
49 Context manager usage:
50
51 lock = Lock()
52 ...
53 with (yield from lock):
54 ...
55
56 Lock objects can be tested for locking state:
57
58 if not lock.locked():
59 yield from lock
60 else:
61 # lock is acquired
62 ...
63
64 """
65
66 def __init__(self, *, loop=None):
67 self._waiters = collections.deque()
68 self._locked = False
69 if loop is not None:
70 self._loop = loop
71 else:
72 self._loop = events.get_event_loop()
73
74 def __repr__(self):
75 res = super().__repr__()
76 extra = 'locked' if self._locked else 'unlocked'
77 if self._waiters:
78 extra = '{},waiters:{}'.format(extra, len(self._waiters))
79 return '<{} [{}]>'.format(res[1:-1], extra)
80
81 def locked(self):
82 """Return true if lock is acquired."""
83 return self._locked
84
85 @tasks.coroutine
86 def acquire(self):
87 """Acquire a lock.
88
89 This method blocks until the lock is unlocked, then sets it to
90 locked and returns True.
91 """
92 if not self._waiters and not self._locked:
93 self._locked = True
94 return True
95
96 fut = futures.Future(loop=self._loop)
97 self._waiters.append(fut)
98 try:
99 yield from fut
100 self._locked = True
101 return True
102 finally:
103 self._waiters.remove(fut)
104
105 def release(self):
106 """Release a lock.
107
108 When the lock is locked, reset it to unlocked, and return.
109 If any other coroutines are blocked waiting for the lock to become
110 unlocked, allow exactly one of them to proceed.
111
112 When invoked on an unlocked lock, a RuntimeError is raised.
113
114 There is no return value.
115 """
116 if self._locked:
117 self._locked = False
118 # Wake up the first waiter who isn't cancelled.
119 for fut in self._waiters:
120 if not fut.done():
121 fut.set_result(True)
122 break
123 else:
124 raise RuntimeError('Lock is not acquired.')
125
126 def __enter__(self):
127 if not self._locked:
128 raise RuntimeError(
129 '"yield from" should be used as context manager expression')
130 return True
131
132 def __exit__(self, *args):
133 self.release()
134
135 def __iter__(self):
136 yield from self.acquire()
137 return self
138
139
140class Event:
141 """An Event implementation, our equivalent to threading.Event.
142
143 Class implementing event objects. An event manages a flag that can be set
144 to true with the set() method and reset to false with the clear() method.
145 The wait() method blocks until the flag is true. The flag is initially
146 false.
147 """
148
149 def __init__(self, *, loop=None):
150 self._waiters = collections.deque()
151 self._value = False
152 if loop is not None:
153 self._loop = loop
154 else:
155 self._loop = events.get_event_loop()
156
157 def __repr__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700158 res = super().__repr__()
Guido van Rossumccea0842013-11-04 13:18:19 -0800159 extra = 'set' if self._value else 'unset'
160 if self._waiters:
161 extra = '{},waiters:{}'.format(extra, len(self._waiters))
162 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700163
164 def is_set(self):
165 """Return true if and only if the internal flag is true."""
166 return self._value
167
168 def set(self):
169 """Set the internal flag to true. All coroutines waiting for it to
170 become true are awakened. Coroutine that call wait() once the flag is
171 true will not block at all.
172 """
173 if not self._value:
174 self._value = True
175
176 for fut in self._waiters:
177 if not fut.done():
178 fut.set_result(True)
179
180 def clear(self):
181 """Reset the internal flag to false. Subsequently, coroutines calling
182 wait() will block until set() is called to set the internal flag
183 to true again."""
184 self._value = False
185
186 @tasks.coroutine
187 def wait(self):
188 """Block until the internal flag is true.
189
190 If the internal flag is true on entry, return True
191 immediately. Otherwise, block until another coroutine calls
192 set() to set the flag to true, then return True.
193 """
194 if self._value:
195 return True
196
197 fut = futures.Future(loop=self._loop)
198 self._waiters.append(fut)
199 try:
200 yield from fut
201 return True
202 finally:
203 self._waiters.remove(fut)
204
205
Guido van Rossumccea0842013-11-04 13:18:19 -0800206class Condition:
207 """A Condition implementation, our equivalent to threading.Condition.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700208
209 This class implements condition variable objects. A condition variable
210 allows one or more coroutines to wait until they are notified by another
211 coroutine.
Guido van Rossumccea0842013-11-04 13:18:19 -0800212
213 A new Lock object is created and used as the underlying lock.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700214 """
215
216 def __init__(self, *, loop=None):
Guido van Rossumccea0842013-11-04 13:18:19 -0800217 if loop is not None:
218 self._loop = loop
219 else:
220 self._loop = events.get_event_loop()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700221
Guido van Rossumccea0842013-11-04 13:18:19 -0800222 # Lock as an attribute as in threading.Condition.
223 lock = Lock(loop=self._loop)
224 self._lock = lock
225 # Export the lock's locked(), acquire() and release() methods.
226 self.locked = lock.locked
227 self.acquire = lock.acquire
228 self.release = lock.release
229
230 self._waiters = collections.deque()
231
232 def __repr__(self):
233 res = super().__repr__()
234 extra = 'locked' if self.locked() else 'unlocked'
235 if self._waiters:
236 extra = '{},waiters:{}'.format(extra, len(self._waiters))
237 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700238
239 @tasks.coroutine
240 def wait(self):
241 """Wait until notified.
242
243 If the calling coroutine has not acquired the lock when this
244 method is called, a RuntimeError is raised.
245
246 This method releases the underlying lock, and then blocks
247 until it is awakened by a notify() or notify_all() call for
248 the same condition variable in another coroutine. Once
249 awakened, it re-acquires the lock and returns True.
250 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800251 if not self.locked():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700252 raise RuntimeError('cannot wait on un-acquired lock')
253
254 keep_lock = True
255 self.release()
256 try:
257 fut = futures.Future(loop=self._loop)
Guido van Rossumccea0842013-11-04 13:18:19 -0800258 self._waiters.append(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700259 try:
260 yield from fut
261 return True
262 finally:
Guido van Rossumccea0842013-11-04 13:18:19 -0800263 self._waiters.remove(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700264
265 except GeneratorExit:
266 keep_lock = False # Prevent yield in finally clause.
267 raise
268 finally:
269 if keep_lock:
270 yield from self.acquire()
271
272 @tasks.coroutine
273 def wait_for(self, predicate):
274 """Wait until a predicate becomes true.
275
276 The predicate should be a callable which result will be
277 interpreted as a boolean value. The final predicate value is
278 the return value.
279 """
280 result = predicate()
281 while not result:
282 yield from self.wait()
283 result = predicate()
284 return result
285
286 def notify(self, n=1):
287 """By default, wake up one coroutine waiting on this condition, if any.
288 If the calling coroutine has not acquired the lock when this method
289 is called, a RuntimeError is raised.
290
291 This method wakes up at most n of the coroutines waiting for the
292 condition variable; it is a no-op if no coroutines are waiting.
293
294 Note: an awakened coroutine does not actually return from its
295 wait() call until it can reacquire the lock. Since notify() does
296 not release the lock, its caller should.
297 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800298 if not self.locked():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700299 raise RuntimeError('cannot notify on un-acquired lock')
300
301 idx = 0
Guido van Rossumccea0842013-11-04 13:18:19 -0800302 for fut in self._waiters:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700303 if idx >= n:
304 break
305
306 if not fut.done():
307 idx += 1
308 fut.set_result(False)
309
310 def notify_all(self):
311 """Wake up all threads waiting on this condition. This method acts
312 like notify(), but wakes up all waiting threads instead of one. If the
313 calling thread has not acquired the lock when this method is called,
314 a RuntimeError is raised.
315 """
Guido van Rossumccea0842013-11-04 13:18:19 -0800316 self.notify(len(self._waiters))
317
318 def __enter__(self):
319 return self._lock.__enter__()
320
321 def __exit__(self, *args):
322 return self._lock.__exit__(*args)
323
324 def __iter__(self):
325 yield from self.acquire()
326 return self
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700327
328
329class Semaphore:
330 """A Semaphore implementation.
331
332 A semaphore manages an internal counter which is decremented by each
333 acquire() call and incremented by each release() call. The counter
334 can never go below zero; when acquire() finds that it is zero, it blocks,
335 waiting until some other thread calls release().
336
337 Semaphores also support the context manager protocol.
338
339 The first optional argument gives the initial value for the internal
340 counter; it defaults to 1. If the value given is less than 0,
341 ValueError is raised.
342
Guido van Rossumccea0842013-11-04 13:18:19 -0800343 The second optional argument determines if the semaphore can be released
344 more than initial internal counter value; it defaults to False. If the
345 value given is True and number of release() is more than number of
346 successful acquire() calls ValueError is raised.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700347 """
348
349 def __init__(self, value=1, bound=False, *, loop=None):
350 if value < 0:
351 raise ValueError("Semaphore initial value must be > 0")
352 self._value = value
353 self._bound = bound
354 self._bound_value = value
355 self._waiters = collections.deque()
356 self._locked = False
357 if loop is not None:
358 self._loop = loop
359 else:
360 self._loop = events.get_event_loop()
361
362 def __repr__(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700363 res = super().__repr__()
Guido van Rossumccea0842013-11-04 13:18:19 -0800364 extra = 'locked' if self._locked else 'unlocked,value:{}'.format(
365 self._value)
366 if self._waiters:
367 extra = '{},waiters:{}'.format(extra, len(self._waiters))
368 return '<{} [{}]>'.format(res[1:-1], extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700369
370 def locked(self):
371 """Returns True if semaphore can not be acquired immediately."""
372 return self._locked
373
374 @tasks.coroutine
375 def acquire(self):
376 """Acquire a semaphore.
377
378 If the internal counter is larger than zero on entry,
379 decrement it by one and return True immediately. If it is
380 zero on entry, block, waiting until some other coroutine has
381 called release() to make it larger than 0, and then return
382 True.
383 """
384 if not self._waiters and self._value > 0:
385 self._value -= 1
386 if self._value == 0:
387 self._locked = True
388 return True
389
390 fut = futures.Future(loop=self._loop)
391 self._waiters.append(fut)
392 try:
393 yield from fut
394 self._value -= 1
395 if self._value == 0:
396 self._locked = True
397 return True
398 finally:
399 self._waiters.remove(fut)
400
401 def release(self):
402 """Release a semaphore, incrementing the internal counter by one.
403 When it was zero on entry and another coroutine is waiting for it to
404 become larger than zero again, wake up that coroutine.
405
Guido van Rossumccea0842013-11-04 13:18:19 -0800406 If Semaphore is created with "bound" parameter equals true, then
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700407 release() method checks to make sure its current value doesn't exceed
408 its initial value. If it does, ValueError is raised.
409 """
410 if self._bound and self._value >= self._bound_value:
411 raise ValueError('Semaphore released too many times')
412
413 self._value += 1
414 self._locked = False
415
416 for waiter in self._waiters:
417 if not waiter.done():
418 waiter.set_result(True)
419 break
420
421 def __enter__(self):
422 # TODO: This is questionable. How do we know the user actually
423 # wrote "with (yield from sema)" instead of "with sema"?
424 return True
425
426 def __exit__(self, *args):
427 self.release()
428
429 def __iter__(self):
430 yield from self.acquire()
431 return self