blob: 638501787f594ebd18020e4db35cbbf347323e41 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
3__all__ = ['coroutine', 'Task',
4 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
5 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
6 'gather',
7 ]
8
9import collections
10import concurrent.futures
11import functools
12import inspect
13import linecache
14import traceback
15import weakref
16
17from . import events
18from . import futures
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070019from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020
21# If you set _DEBUG to true, @coroutine will wrap the resulting
22# generator objects in a CoroWrapper instance (defined below). That
23# instance will log a message when the generator is never iterated
24# over, which may happen when you forget to use "yield from" with a
25# coroutine call. Note that the value of the _DEBUG flag is taken
26# when the decorator is used, so to be of any use it must be set
27# before you define your coroutines. A downside of using this feature
28# is that tracebacks show entries for the CoroWrapper.__next__ method
29# when _DEBUG is true.
30_DEBUG = False
31
32
33class CoroWrapper:
34 """Wrapper for coroutine in _DEBUG mode."""
35
36 __slot__ = ['gen', 'func']
37
38 def __init__(self, gen, func):
39 assert inspect.isgenerator(gen), gen
40 self.gen = gen
41 self.func = func
42
43 def __iter__(self):
44 return self
45
46 def __next__(self):
47 return next(self.gen)
48
49 def send(self, value):
50 return self.gen.send(value)
51
52 def throw(self, exc):
53 return self.gen.throw(exc)
54
55 def close(self):
56 return self.gen.close()
57
58 def __del__(self):
59 frame = self.gen.gi_frame
60 if frame is not None and frame.f_lasti == -1:
61 func = self.func
62 code = func.__code__
63 filename = code.co_filename
64 lineno = code.co_firstlineno
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070065 logger.error('Coroutine %r defined at %s:%s was never yielded from',
66 func.__name__, filename, lineno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070067
68
69def coroutine(func):
70 """Decorator to mark coroutines.
71
72 If the coroutine is not yielded from before it is destroyed,
73 an error message is logged.
74 """
75 if inspect.isgeneratorfunction(func):
76 coro = func
77 else:
78 @functools.wraps(func)
79 def coro(*args, **kw):
80 res = func(*args, **kw)
81 if isinstance(res, futures.Future) or inspect.isgenerator(res):
82 res = yield from res
83 return res
84
85 if not _DEBUG:
86 wrapper = coro
87 else:
88 @functools.wraps(func)
89 def wrapper(*args, **kwds):
90 w = CoroWrapper(coro(*args, **kwds), func)
91 w.__name__ = coro.__name__
92 w.__doc__ = coro.__doc__
93 return w
94
95 wrapper._is_coroutine = True # For iscoroutinefunction().
96 return wrapper
97
98
99def iscoroutinefunction(func):
100 """Return True if func is a decorated coroutine function."""
101 return getattr(func, '_is_coroutine', False)
102
103
104def iscoroutine(obj):
105 """Return True if obj is a coroutine object."""
106 return isinstance(obj, CoroWrapper) or inspect.isgenerator(obj)
107
108
109class Task(futures.Future):
110 """A coroutine wrapped in a Future."""
111
112 # An important invariant maintained while a Task not done:
113 #
114 # - Either _fut_waiter is None, and _step() is scheduled;
115 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
116 #
117 # The only transition from the latter to the former is through
118 # _wakeup(). When _fut_waiter is not None, one of its callbacks
119 # must be _wakeup().
120
121 # Weak set containing all tasks alive.
122 _all_tasks = weakref.WeakSet()
123
124 @classmethod
125 def all_tasks(cls, loop=None):
126 """Return a set of all tasks for an event loop.
127
128 By default all tasks for the current event loop are returned.
129 """
130 if loop is None:
131 loop = events.get_event_loop()
132 return {t for t in cls._all_tasks if t._loop is loop}
133
134 def __init__(self, coro, *, loop=None):
135 assert iscoroutine(coro), repr(coro) # Not a coroutine function!
136 super().__init__(loop=loop)
137 self._coro = iter(coro) # Use the iterator just in case.
138 self._fut_waiter = None
139 self._must_cancel = False
140 self._loop.call_soon(self._step)
141 self.__class__._all_tasks.add(self)
142
143 def __repr__(self):
144 res = super().__repr__()
145 if (self._must_cancel and
146 self._state == futures._PENDING and
147 '<PENDING' in res):
148 res = res.replace('<PENDING', '<CANCELLING', 1)
149 i = res.find('<')
150 if i < 0:
151 i = len(res)
152 res = res[:i] + '(<{}>)'.format(self._coro.__name__) + res[i:]
153 return res
154
155 def get_stack(self, *, limit=None):
156 """Return the list of stack frames for this task's coroutine.
157
158 If the coroutine is active, this returns the stack where it is
159 suspended. If the coroutine has completed successfully or was
160 cancelled, this returns an empty list. If the coroutine was
161 terminated by an exception, this returns the list of traceback
162 frames.
163
164 The frames are always ordered from oldest to newest.
165
166 The optional limit gives the maximum nummber of frames to
167 return; by default all available frames are returned. Its
168 meaning differs depending on whether a stack or a traceback is
169 returned: the newest frames of a stack are returned, but the
170 oldest frames of a traceback are returned. (This matches the
171 behavior of the traceback module.)
172
173 For reasons beyond our control, only one stack frame is
174 returned for a suspended coroutine.
175 """
176 frames = []
177 f = self._coro.gi_frame
178 if f is not None:
179 while f is not None:
180 if limit is not None:
181 if limit <= 0:
182 break
183 limit -= 1
184 frames.append(f)
185 f = f.f_back
186 frames.reverse()
187 elif self._exception is not None:
188 tb = self._exception.__traceback__
189 while tb is not None:
190 if limit is not None:
191 if limit <= 0:
192 break
193 limit -= 1
194 frames.append(tb.tb_frame)
195 tb = tb.tb_next
196 return frames
197
198 def print_stack(self, *, limit=None, file=None):
199 """Print the stack or traceback for this task's coroutine.
200
201 This produces output similar to that of the traceback module,
202 for the frames retrieved by get_stack(). The limit argument
203 is passed to get_stack(). The file argument is an I/O stream
204 to which the output goes; by default it goes to sys.stderr.
205 """
206 extracted_list = []
207 checked = set()
208 for f in self.get_stack(limit=limit):
209 lineno = f.f_lineno
210 co = f.f_code
211 filename = co.co_filename
212 name = co.co_name
213 if filename not in checked:
214 checked.add(filename)
215 linecache.checkcache(filename)
216 line = linecache.getline(filename, lineno, f.f_globals)
217 extracted_list.append((filename, lineno, name, line))
218 exc = self._exception
219 if not extracted_list:
220 print('No stack for %r' % self, file=file)
221 elif exc is not None:
222 print('Traceback for %r (most recent call last):' % self,
223 file=file)
224 else:
225 print('Stack for %r (most recent call last):' % self,
226 file=file)
227 traceback.print_list(extracted_list, file=file)
228 if exc is not None:
229 for line in traceback.format_exception_only(exc.__class__, exc):
230 print(line, file=file, end='')
231
232 def cancel(self):
233 if self.done():
234 return False
235 if self._fut_waiter is not None:
236 if self._fut_waiter.cancel():
237 # Leave self._fut_waiter; it may be a Task that
238 # catches and ignores the cancellation so we may have
239 # to cancel it again later.
240 return True
241 # It must be the case that self._step is already scheduled.
242 self._must_cancel = True
243 return True
244
245 def _step(self, value=None, exc=None):
246 assert not self.done(), \
247 '_step(): already done: {!r}, {!r}, {!r}'.format(self, value, exc)
248 if self._must_cancel:
249 if not isinstance(exc, futures.CancelledError):
250 exc = futures.CancelledError()
251 self._must_cancel = False
252 coro = self._coro
253 self._fut_waiter = None
254 # Call either coro.throw(exc) or coro.send(value).
255 try:
256 if exc is not None:
257 result = coro.throw(exc)
258 elif value is not None:
259 result = coro.send(value)
260 else:
261 result = next(coro)
262 except StopIteration as exc:
263 self.set_result(exc.value)
264 except futures.CancelledError as exc:
265 super().cancel() # I.e., Future.cancel(self).
266 except Exception as exc:
267 self.set_exception(exc)
268 except BaseException as exc:
269 self.set_exception(exc)
270 raise
271 else:
272 if isinstance(result, futures.Future):
273 # Yielded Future must come from Future.__iter__().
274 if result._blocking:
275 result._blocking = False
276 result.add_done_callback(self._wakeup)
277 self._fut_waiter = result
278 if self._must_cancel:
279 if self._fut_waiter.cancel():
280 self._must_cancel = False
281 else:
282 self._loop.call_soon(
283 self._step, None,
284 RuntimeError(
285 'yield was used instead of yield from '
286 'in task {!r} with {!r}'.format(self, result)))
287 elif result is None:
288 # Bare yield relinquishes control for one event loop iteration.
289 self._loop.call_soon(self._step)
290 elif inspect.isgenerator(result):
291 # Yielding a generator is just wrong.
292 self._loop.call_soon(
293 self._step, None,
294 RuntimeError(
295 'yield was used instead of yield from for '
296 'generator in task {!r} with {}'.format(
297 self, result)))
298 else:
299 # Yielding something else is an error.
300 self._loop.call_soon(
301 self._step, None,
302 RuntimeError(
303 'Task got bad yield: {!r}'.format(result)))
304 self = None
305
306 def _wakeup(self, future):
307 try:
308 value = future.result()
309 except Exception as exc:
310 # This may also be a cancellation.
311 self._step(None, exc)
312 else:
313 self._step(value, None)
314 self = None # Needed to break cycles when an exception occurs.
315
316
317# wait() and as_completed() similar to those in PEP 3148.
318
319FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
320FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
321ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
322
323
324@coroutine
325def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
326 """Wait for the Futures and coroutines given by fs to complete.
327
328 Coroutines will be wrapped in Tasks.
329
330 Returns two sets of Future: (done, pending).
331
332 Usage:
333
334 done, pending = yield from asyncio.wait(fs)
335
336 Note: This does not raise TimeoutError! Futures that aren't done
337 when the timeout occurs are returned in the second set.
338 """
339 if not fs:
340 raise ValueError('Set of coroutines/Futures is empty.')
341
342 if loop is None:
343 loop = events.get_event_loop()
344
345 fs = set(async(f, loop=loop) for f in fs)
346
347 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
348 raise ValueError('Invalid return_when value: {}'.format(return_when))
349 return (yield from _wait(fs, timeout, return_when, loop))
350
351
352def _release_waiter(waiter, value=True, *args):
353 if not waiter.done():
354 waiter.set_result(value)
355
356
357@coroutine
358def wait_for(fut, timeout, *, loop=None):
359 """Wait for the single Future or coroutine to complete, with timeout.
360
361 Coroutine will be wrapped in Task.
362
363 Returns result of the Future or coroutine. Raises TimeoutError when
364 timeout occurs.
365
366 Usage:
367
368 result = yield from asyncio.wait_for(fut, 10.0)
369
370 """
371 if loop is None:
372 loop = events.get_event_loop()
373
374 waiter = futures.Future(loop=loop)
375 timeout_handle = loop.call_later(timeout, _release_waiter, waiter, False)
376 cb = functools.partial(_release_waiter, waiter, True)
377
378 fut = async(fut, loop=loop)
379 fut.add_done_callback(cb)
380
381 try:
382 if (yield from waiter):
383 return fut.result()
384 else:
385 fut.remove_done_callback(cb)
386 raise futures.TimeoutError()
387 finally:
388 timeout_handle.cancel()
389
390
391@coroutine
392def _wait(fs, timeout, return_when, loop):
393 """Internal helper for wait() and _wait_for().
394
395 The fs argument must be a collection of Futures.
396 """
397 assert fs, 'Set of Futures is empty.'
398 waiter = futures.Future(loop=loop)
399 timeout_handle = None
400 if timeout is not None:
401 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
402 counter = len(fs)
403
404 def _on_completion(f):
405 nonlocal counter
406 counter -= 1
407 if (counter <= 0 or
408 return_when == FIRST_COMPLETED or
409 return_when == FIRST_EXCEPTION and (not f.cancelled() and
410 f.exception() is not None)):
411 if timeout_handle is not None:
412 timeout_handle.cancel()
413 if not waiter.done():
414 waiter.set_result(False)
415
416 for f in fs:
417 f.add_done_callback(_on_completion)
418
419 try:
420 yield from waiter
421 finally:
422 if timeout_handle is not None:
423 timeout_handle.cancel()
424
425 done, pending = set(), set()
426 for f in fs:
427 f.remove_done_callback(_on_completion)
428 if f.done():
429 done.add(f)
430 else:
431 pending.add(f)
432 return done, pending
433
434
435# This is *not* a @coroutine! It is just an iterator (yielding Futures).
436def as_completed(fs, *, loop=None, timeout=None):
437 """Return an iterator whose values, when waited for, are Futures.
438
439 This differs from PEP 3148; the proper way to use this is:
440
441 for f in as_completed(fs):
442 result = yield from f # The 'yield from' may raise.
443 # Use result.
444
445 Raises TimeoutError if the timeout occurs before all Futures are
446 done.
447
448 Note: The futures 'f' are not necessarily members of fs.
449 """
450 loop = loop if loop is not None else events.get_event_loop()
451 deadline = None if timeout is None else loop.time() + timeout
452 todo = set(async(f, loop=loop) for f in fs)
453 completed = collections.deque()
454
455 @coroutine
456 def _wait_for_one():
457 while not completed:
458 timeout = None
459 if deadline is not None:
460 timeout = deadline - loop.time()
461 if timeout < 0:
462 raise futures.TimeoutError()
463 done, pending = yield from _wait(
464 todo, timeout, FIRST_COMPLETED, loop)
465 # Multiple callers might be waiting for the same events
466 # and getting the same outcome. Dedupe by updating todo.
467 for f in done:
468 if f in todo:
469 todo.remove(f)
470 completed.append(f)
471 f = completed.popleft()
472 return f.result() # May raise.
473
474 for _ in range(len(todo)):
475 yield _wait_for_one()
476
477
478@coroutine
479def sleep(delay, result=None, *, loop=None):
480 """Coroutine that completes after a given time (in seconds)."""
481 future = futures.Future(loop=loop)
482 h = future._loop.call_later(delay, future.set_result, result)
483 try:
484 return (yield from future)
485 finally:
486 h.cancel()
487
488
489def async(coro_or_future, *, loop=None):
490 """Wrap a coroutine in a future.
491
492 If the argument is a Future, it is returned directly.
493 """
494 if isinstance(coro_or_future, futures.Future):
495 if loop is not None and loop is not coro_or_future._loop:
496 raise ValueError('loop argument must agree with Future')
497 return coro_or_future
498 elif iscoroutine(coro_or_future):
499 return Task(coro_or_future, loop=loop)
500 else:
501 raise TypeError('A Future or coroutine is required')
502
503
504class _GatheringFuture(futures.Future):
505 """Helper for gather().
506
507 This overrides cancel() to cancel all the children and act more
508 like Task.cancel(), which doesn't immediately mark itself as
509 cancelled.
510 """
511
512 def __init__(self, children, *, loop=None):
513 super().__init__(loop=loop)
514 self._children = children
515
516 def cancel(self):
517 if self.done():
518 return False
519 for child in self._children:
520 child.cancel()
521 return True
522
523
524def gather(*coros_or_futures, loop=None, return_exceptions=False):
525 """Return a future aggregating results from the given coroutines
526 or futures.
527
528 All futures must share the same event loop. If all the tasks are
529 done successfully, the returned future's result is the list of
530 results (in the order of the original sequence, not necessarily
531 the order of results arrival). If *result_exception* is True,
532 exceptions in the tasks are treated the same as successful
533 results, and gathered in the result list; otherwise, the first
534 raised exception will be immediately propagated to the returned
535 future.
536
537 Cancellation: if the outer Future is cancelled, all children (that
538 have not completed yet) are also cancelled. If any child is
539 cancelled, this is treated as if it raised CancelledError --
540 the outer Future is *not* cancelled in this case. (This is to
541 prevent the cancellation of one child to cause other children to
542 be cancelled.)
543 """
544 children = [async(fut, loop=loop) for fut in coros_or_futures]
545 n = len(children)
546 if n == 0:
547 outer = futures.Future(loop=loop)
548 outer.set_result([])
549 return outer
550 if loop is None:
551 loop = children[0]._loop
552 for fut in children:
553 if fut._loop is not loop:
554 raise ValueError("futures are tied to different event loops")
555 outer = _GatheringFuture(children, loop=loop)
556 nfinished = 0
557 results = [None] * n
558
559 def _done_callback(i, fut):
560 nonlocal nfinished
561 if outer._state != futures._PENDING:
562 if fut._exception is not None:
563 # Mark exception retrieved.
564 fut.exception()
565 return
566 if fut._state == futures._CANCELLED:
567 res = futures.CancelledError()
568 if not return_exceptions:
569 outer.set_exception(res)
570 return
571 elif fut._exception is not None:
572 res = fut.exception() # Mark exception retrieved.
573 if not return_exceptions:
574 outer.set_exception(res)
575 return
576 else:
577 res = fut._result
578 results[i] = res
579 nfinished += 1
580 if nfinished == n:
581 outer.set_result(results)
582
583 for i, fut in enumerate(children):
584 fut.add_done_callback(functools.partial(_done_callback, i))
585 return outer
586
587
588def shield(arg, *, loop=None):
589 """Wait for a future, shielding it from cancellation.
590
591 The statement
592
593 res = yield from shield(something())
594
595 is exactly equivalent to the statement
596
597 res = yield from something()
598
599 *except* that if the coroutine containing it is cancelled, the
600 task running in something() is not cancelled. From the POV of
601 something(), the cancellation did not happen. But its caller is
602 still cancelled, so the yield-from expression still raises
603 CancelledError. Note: If something() is cancelled by other means
604 this will still cancel shield().
605
606 If you want to completely ignore cancellation (not recommended)
607 you can combine shield() with a try/except clause, as follows:
608
609 try:
610 res = yield from shield(something())
611 except CancelledError:
612 res = None
613 """
614 inner = async(arg, loop=loop)
615 if inner.done():
616 # Shortcut.
617 return inner
618 loop = inner._loop
619 outer = futures.Future(loop=loop)
620
621 def _done_callback(inner):
622 if outer.cancelled():
623 # Mark inner's result as retrieved.
624 inner.cancelled() or inner.exception()
625 return
626 if inner.cancelled():
627 outer.cancel()
628 else:
629 exc = inner.exception()
630 if exc is not None:
631 outer.set_exception(exc)
632 else:
633 outer.set_result(inner.result())
634
635 inner.add_done_callback(_done_callback)
636 return outer