blob: 06edbbc1d33c78af9a4f33b127401294d0e964d5 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Synchronization primitives."""
2
3__all__ = ['Lock', 'Event', 'Condition', 'Semaphore']
4
5import collections
6
7from . import events
8from . import futures
9from . import tasks
10
11
12class Lock:
13 """Primitive lock objects.
14
15 A primitive lock is a synchronization primitive that is not owned
16 by a particular coroutine when locked. A primitive lock is in one
17 of two states, 'locked' or 'unlocked'.
18
19 It is created in the unlocked state. It has two basic methods,
20 acquire() and release(). When the state is unlocked, acquire()
21 changes the state to locked and returns immediately. When the
22 state is locked, acquire() blocks until a call to release() in
23 another coroutine changes it to unlocked, then the acquire() call
24 resets it to locked and returns. The release() method should only
25 be called in the locked state; it changes the state to unlocked
26 and returns immediately. If an attempt is made to release an
27 unlocked lock, a RuntimeError will be raised.
28
29 When more than one coroutine is blocked in acquire() waiting for
30 the state to turn to unlocked, only one coroutine proceeds when a
31 release() call resets the state to unlocked; first coroutine which
32 is blocked in acquire() is being processed.
33
34 acquire() is a coroutine and should be called with 'yield from'.
35
36 Locks also support the context manager protocol. '(yield from lock)'
37 should be used as context manager expression.
38
39 Usage:
40
41 lock = Lock()
42 ...
43 yield from lock
44 try:
45 ...
46 finally:
47 lock.release()
48
49 Context manager usage:
50
51 lock = Lock()
52 ...
53 with (yield from lock):
54 ...
55
56 Lock objects can be tested for locking state:
57
58 if not lock.locked():
59 yield from lock
60 else:
61 # lock is acquired
62 ...
63
64 """
65
66 def __init__(self, *, loop=None):
67 self._waiters = collections.deque()
68 self._locked = False
69 if loop is not None:
70 self._loop = loop
71 else:
72 self._loop = events.get_event_loop()
73
74 def __repr__(self):
75 res = super().__repr__()
76 extra = 'locked' if self._locked else 'unlocked'
77 if self._waiters:
78 extra = '{},waiters:{}'.format(extra, len(self._waiters))
79 return '<{} [{}]>'.format(res[1:-1], extra)
80
81 def locked(self):
82 """Return true if lock is acquired."""
83 return self._locked
84
85 @tasks.coroutine
86 def acquire(self):
87 """Acquire a lock.
88
89 This method blocks until the lock is unlocked, then sets it to
90 locked and returns True.
91 """
92 if not self._waiters and not self._locked:
93 self._locked = True
94 return True
95
96 fut = futures.Future(loop=self._loop)
97 self._waiters.append(fut)
98 try:
99 yield from fut
100 self._locked = True
101 return True
102 finally:
103 self._waiters.remove(fut)
104
105 def release(self):
106 """Release a lock.
107
108 When the lock is locked, reset it to unlocked, and return.
109 If any other coroutines are blocked waiting for the lock to become
110 unlocked, allow exactly one of them to proceed.
111
112 When invoked on an unlocked lock, a RuntimeError is raised.
113
114 There is no return value.
115 """
116 if self._locked:
117 self._locked = False
118 # Wake up the first waiter who isn't cancelled.
119 for fut in self._waiters:
120 if not fut.done():
121 fut.set_result(True)
122 break
123 else:
124 raise RuntimeError('Lock is not acquired.')
125
126 def __enter__(self):
127 if not self._locked:
128 raise RuntimeError(
129 '"yield from" should be used as context manager expression')
130 return True
131
132 def __exit__(self, *args):
133 self.release()
134
135 def __iter__(self):
136 yield from self.acquire()
137 return self
138
139
140class Event:
141 """An Event implementation, our equivalent to threading.Event.
142
143 Class implementing event objects. An event manages a flag that can be set
144 to true with the set() method and reset to false with the clear() method.
145 The wait() method blocks until the flag is true. The flag is initially
146 false.
147 """
148
149 def __init__(self, *, loop=None):
150 self._waiters = collections.deque()
151 self._value = False
152 if loop is not None:
153 self._loop = loop
154 else:
155 self._loop = events.get_event_loop()
156
157 def __repr__(self):
158 # TODO: add waiters:N if > 0.
159 res = super().__repr__()
160 return '<{} [{}]>'.format(res[1:-1], 'set' if self._value else 'unset')
161
162 def is_set(self):
163 """Return true if and only if the internal flag is true."""
164 return self._value
165
166 def set(self):
167 """Set the internal flag to true. All coroutines waiting for it to
168 become true are awakened. Coroutine that call wait() once the flag is
169 true will not block at all.
170 """
171 if not self._value:
172 self._value = True
173
174 for fut in self._waiters:
175 if not fut.done():
176 fut.set_result(True)
177
178 def clear(self):
179 """Reset the internal flag to false. Subsequently, coroutines calling
180 wait() will block until set() is called to set the internal flag
181 to true again."""
182 self._value = False
183
184 @tasks.coroutine
185 def wait(self):
186 """Block until the internal flag is true.
187
188 If the internal flag is true on entry, return True
189 immediately. Otherwise, block until another coroutine calls
190 set() to set the flag to true, then return True.
191 """
192 if self._value:
193 return True
194
195 fut = futures.Future(loop=self._loop)
196 self._waiters.append(fut)
197 try:
198 yield from fut
199 return True
200 finally:
201 self._waiters.remove(fut)
202
203
204# TODO: Why is this a Lock subclass? threading.Condition *has* a lock.
205class Condition(Lock):
206 """A Condition implementation.
207
208 This class implements condition variable objects. A condition variable
209 allows one or more coroutines to wait until they are notified by another
210 coroutine.
211 """
212
213 def __init__(self, *, loop=None):
214 super().__init__(loop=loop)
215 self._condition_waiters = collections.deque()
216
217 # TODO: Add __repr__() with len(_condition_waiters).
218
219 @tasks.coroutine
220 def wait(self):
221 """Wait until notified.
222
223 If the calling coroutine has not acquired the lock when this
224 method is called, a RuntimeError is raised.
225
226 This method releases the underlying lock, and then blocks
227 until it is awakened by a notify() or notify_all() call for
228 the same condition variable in another coroutine. Once
229 awakened, it re-acquires the lock and returns True.
230 """
231 if not self._locked:
232 raise RuntimeError('cannot wait on un-acquired lock')
233
234 keep_lock = True
235 self.release()
236 try:
237 fut = futures.Future(loop=self._loop)
238 self._condition_waiters.append(fut)
239 try:
240 yield from fut
241 return True
242 finally:
243 self._condition_waiters.remove(fut)
244
245 except GeneratorExit:
246 keep_lock = False # Prevent yield in finally clause.
247 raise
248 finally:
249 if keep_lock:
250 yield from self.acquire()
251
252 @tasks.coroutine
253 def wait_for(self, predicate):
254 """Wait until a predicate becomes true.
255
256 The predicate should be a callable which result will be
257 interpreted as a boolean value. The final predicate value is
258 the return value.
259 """
260 result = predicate()
261 while not result:
262 yield from self.wait()
263 result = predicate()
264 return result
265
266 def notify(self, n=1):
267 """By default, wake up one coroutine waiting on this condition, if any.
268 If the calling coroutine has not acquired the lock when this method
269 is called, a RuntimeError is raised.
270
271 This method wakes up at most n of the coroutines waiting for the
272 condition variable; it is a no-op if no coroutines are waiting.
273
274 Note: an awakened coroutine does not actually return from its
275 wait() call until it can reacquire the lock. Since notify() does
276 not release the lock, its caller should.
277 """
278 if not self._locked:
279 raise RuntimeError('cannot notify on un-acquired lock')
280
281 idx = 0
282 for fut in self._condition_waiters:
283 if idx >= n:
284 break
285
286 if not fut.done():
287 idx += 1
288 fut.set_result(False)
289
290 def notify_all(self):
291 """Wake up all threads waiting on this condition. This method acts
292 like notify(), but wakes up all waiting threads instead of one. If the
293 calling thread has not acquired the lock when this method is called,
294 a RuntimeError is raised.
295 """
296 self.notify(len(self._condition_waiters))
297
298
299class Semaphore:
300 """A Semaphore implementation.
301
302 A semaphore manages an internal counter which is decremented by each
303 acquire() call and incremented by each release() call. The counter
304 can never go below zero; when acquire() finds that it is zero, it blocks,
305 waiting until some other thread calls release().
306
307 Semaphores also support the context manager protocol.
308
309 The first optional argument gives the initial value for the internal
310 counter; it defaults to 1. If the value given is less than 0,
311 ValueError is raised.
312
313 The second optional argument determins can semophore be released more than
314 initial internal counter value; it defaults to False. If the value given
315 is True and number of release() is more than number of successfull
316 acquire() calls ValueError is raised.
317 """
318
319 def __init__(self, value=1, bound=False, *, loop=None):
320 if value < 0:
321 raise ValueError("Semaphore initial value must be > 0")
322 self._value = value
323 self._bound = bound
324 self._bound_value = value
325 self._waiters = collections.deque()
326 self._locked = False
327 if loop is not None:
328 self._loop = loop
329 else:
330 self._loop = events.get_event_loop()
331
332 def __repr__(self):
333 # TODO: add waiters:N if > 0.
334 res = super().__repr__()
335 return '<{} [{}]>'.format(
336 res[1:-1],
337 'locked' if self._locked else 'unlocked,value:{}'.format(
338 self._value))
339
340 def locked(self):
341 """Returns True if semaphore can not be acquired immediately."""
342 return self._locked
343
344 @tasks.coroutine
345 def acquire(self):
346 """Acquire a semaphore.
347
348 If the internal counter is larger than zero on entry,
349 decrement it by one and return True immediately. If it is
350 zero on entry, block, waiting until some other coroutine has
351 called release() to make it larger than 0, and then return
352 True.
353 """
354 if not self._waiters and self._value > 0:
355 self._value -= 1
356 if self._value == 0:
357 self._locked = True
358 return True
359
360 fut = futures.Future(loop=self._loop)
361 self._waiters.append(fut)
362 try:
363 yield from fut
364 self._value -= 1
365 if self._value == 0:
366 self._locked = True
367 return True
368 finally:
369 self._waiters.remove(fut)
370
371 def release(self):
372 """Release a semaphore, incrementing the internal counter by one.
373 When it was zero on entry and another coroutine is waiting for it to
374 become larger than zero again, wake up that coroutine.
375
376 If Semaphore is create with "bound" paramter equals true, then
377 release() method checks to make sure its current value doesn't exceed
378 its initial value. If it does, ValueError is raised.
379 """
380 if self._bound and self._value >= self._bound_value:
381 raise ValueError('Semaphore released too many times')
382
383 self._value += 1
384 self._locked = False
385
386 for waiter in self._waiters:
387 if not waiter.done():
388 waiter.set_result(True)
389 break
390
391 def __enter__(self):
392 # TODO: This is questionable. How do we know the user actually
393 # wrote "with (yield from sema)" instead of "with sema"?
394 return True
395
396 def __exit__(self, *args):
397 self.release()
398
399 def __iter__(self):
400 yield from self.acquire()
401 return self