blob: 999e9629bc692c172440296f0744e778b47fe0b9 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
3__all__ = ['coroutine', 'Task',
4 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
5 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
Guido van Rossumde3a1362013-11-29 09:29:00 -08006 'gather', 'shield',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007 ]
8
9import collections
10import concurrent.futures
11import functools
12import inspect
13import linecache
14import traceback
15import weakref
16
17from . import events
18from . import futures
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070019from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020
21# If you set _DEBUG to true, @coroutine will wrap the resulting
22# generator objects in a CoroWrapper instance (defined below). That
23# instance will log a message when the generator is never iterated
24# over, which may happen when you forget to use "yield from" with a
25# coroutine call. Note that the value of the _DEBUG flag is taken
26# when the decorator is used, so to be of any use it must be set
27# before you define your coroutines. A downside of using this feature
28# is that tracebacks show entries for the CoroWrapper.__next__ method
29# when _DEBUG is true.
30_DEBUG = False
31
32
33class CoroWrapper:
34 """Wrapper for coroutine in _DEBUG mode."""
35
36 __slot__ = ['gen', 'func']
37
38 def __init__(self, gen, func):
39 assert inspect.isgenerator(gen), gen
40 self.gen = gen
41 self.func = func
42
43 def __iter__(self):
44 return self
45
46 def __next__(self):
47 return next(self.gen)
48
49 def send(self, value):
50 return self.gen.send(value)
51
52 def throw(self, exc):
53 return self.gen.throw(exc)
54
55 def close(self):
56 return self.gen.close()
57
58 def __del__(self):
59 frame = self.gen.gi_frame
60 if frame is not None and frame.f_lasti == -1:
61 func = self.func
62 code = func.__code__
63 filename = code.co_filename
64 lineno = code.co_firstlineno
Guido van Rossum2b430b82013-11-01 14:13:30 -070065 logger.error(
66 'Coroutine %r defined at %s:%s was never yielded from',
67 func.__name__, filename, lineno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070068
69
70def coroutine(func):
71 """Decorator to mark coroutines.
72
73 If the coroutine is not yielded from before it is destroyed,
74 an error message is logged.
75 """
76 if inspect.isgeneratorfunction(func):
77 coro = func
78 else:
79 @functools.wraps(func)
80 def coro(*args, **kw):
81 res = func(*args, **kw)
82 if isinstance(res, futures.Future) or inspect.isgenerator(res):
83 res = yield from res
84 return res
85
86 if not _DEBUG:
87 wrapper = coro
88 else:
89 @functools.wraps(func)
90 def wrapper(*args, **kwds):
91 w = CoroWrapper(coro(*args, **kwds), func)
92 w.__name__ = coro.__name__
93 w.__doc__ = coro.__doc__
94 return w
95
96 wrapper._is_coroutine = True # For iscoroutinefunction().
97 return wrapper
98
99
100def iscoroutinefunction(func):
101 """Return True if func is a decorated coroutine function."""
102 return getattr(func, '_is_coroutine', False)
103
104
105def iscoroutine(obj):
106 """Return True if obj is a coroutine object."""
107 return isinstance(obj, CoroWrapper) or inspect.isgenerator(obj)
108
109
110class Task(futures.Future):
111 """A coroutine wrapped in a Future."""
112
113 # An important invariant maintained while a Task not done:
114 #
115 # - Either _fut_waiter is None, and _step() is scheduled;
116 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
117 #
118 # The only transition from the latter to the former is through
119 # _wakeup(). When _fut_waiter is not None, one of its callbacks
120 # must be _wakeup().
121
122 # Weak set containing all tasks alive.
123 _all_tasks = weakref.WeakSet()
124
125 @classmethod
126 def all_tasks(cls, loop=None):
127 """Return a set of all tasks for an event loop.
128
129 By default all tasks for the current event loop are returned.
130 """
131 if loop is None:
132 loop = events.get_event_loop()
133 return {t for t in cls._all_tasks if t._loop is loop}
134
135 def __init__(self, coro, *, loop=None):
136 assert iscoroutine(coro), repr(coro) # Not a coroutine function!
137 super().__init__(loop=loop)
138 self._coro = iter(coro) # Use the iterator just in case.
139 self._fut_waiter = None
140 self._must_cancel = False
141 self._loop.call_soon(self._step)
142 self.__class__._all_tasks.add(self)
143
144 def __repr__(self):
145 res = super().__repr__()
146 if (self._must_cancel and
147 self._state == futures._PENDING and
148 '<PENDING' in res):
149 res = res.replace('<PENDING', '<CANCELLING', 1)
150 i = res.find('<')
151 if i < 0:
152 i = len(res)
153 res = res[:i] + '(<{}>)'.format(self._coro.__name__) + res[i:]
154 return res
155
156 def get_stack(self, *, limit=None):
157 """Return the list of stack frames for this task's coroutine.
158
159 If the coroutine is active, this returns the stack where it is
160 suspended. If the coroutine has completed successfully or was
161 cancelled, this returns an empty list. If the coroutine was
162 terminated by an exception, this returns the list of traceback
163 frames.
164
165 The frames are always ordered from oldest to newest.
166
167 The optional limit gives the maximum nummber of frames to
168 return; by default all available frames are returned. Its
169 meaning differs depending on whether a stack or a traceback is
170 returned: the newest frames of a stack are returned, but the
171 oldest frames of a traceback are returned. (This matches the
172 behavior of the traceback module.)
173
174 For reasons beyond our control, only one stack frame is
175 returned for a suspended coroutine.
176 """
177 frames = []
178 f = self._coro.gi_frame
179 if f is not None:
180 while f is not None:
181 if limit is not None:
182 if limit <= 0:
183 break
184 limit -= 1
185 frames.append(f)
186 f = f.f_back
187 frames.reverse()
188 elif self._exception is not None:
189 tb = self._exception.__traceback__
190 while tb is not None:
191 if limit is not None:
192 if limit <= 0:
193 break
194 limit -= 1
195 frames.append(tb.tb_frame)
196 tb = tb.tb_next
197 return frames
198
199 def print_stack(self, *, limit=None, file=None):
200 """Print the stack or traceback for this task's coroutine.
201
202 This produces output similar to that of the traceback module,
203 for the frames retrieved by get_stack(). The limit argument
204 is passed to get_stack(). The file argument is an I/O stream
205 to which the output goes; by default it goes to sys.stderr.
206 """
207 extracted_list = []
208 checked = set()
209 for f in self.get_stack(limit=limit):
210 lineno = f.f_lineno
211 co = f.f_code
212 filename = co.co_filename
213 name = co.co_name
214 if filename not in checked:
215 checked.add(filename)
216 linecache.checkcache(filename)
217 line = linecache.getline(filename, lineno, f.f_globals)
218 extracted_list.append((filename, lineno, name, line))
219 exc = self._exception
220 if not extracted_list:
221 print('No stack for %r' % self, file=file)
222 elif exc is not None:
223 print('Traceback for %r (most recent call last):' % self,
224 file=file)
225 else:
226 print('Stack for %r (most recent call last):' % self,
227 file=file)
228 traceback.print_list(extracted_list, file=file)
229 if exc is not None:
230 for line in traceback.format_exception_only(exc.__class__, exc):
231 print(line, file=file, end='')
232
233 def cancel(self):
234 if self.done():
235 return False
236 if self._fut_waiter is not None:
237 if self._fut_waiter.cancel():
238 # Leave self._fut_waiter; it may be a Task that
239 # catches and ignores the cancellation so we may have
240 # to cancel it again later.
241 return True
242 # It must be the case that self._step is already scheduled.
243 self._must_cancel = True
244 return True
245
246 def _step(self, value=None, exc=None):
247 assert not self.done(), \
248 '_step(): already done: {!r}, {!r}, {!r}'.format(self, value, exc)
249 if self._must_cancel:
250 if not isinstance(exc, futures.CancelledError):
251 exc = futures.CancelledError()
252 self._must_cancel = False
253 coro = self._coro
254 self._fut_waiter = None
255 # Call either coro.throw(exc) or coro.send(value).
256 try:
257 if exc is not None:
258 result = coro.throw(exc)
259 elif value is not None:
260 result = coro.send(value)
261 else:
262 result = next(coro)
263 except StopIteration as exc:
264 self.set_result(exc.value)
265 except futures.CancelledError as exc:
266 super().cancel() # I.e., Future.cancel(self).
267 except Exception as exc:
268 self.set_exception(exc)
269 except BaseException as exc:
270 self.set_exception(exc)
271 raise
272 else:
273 if isinstance(result, futures.Future):
274 # Yielded Future must come from Future.__iter__().
275 if result._blocking:
276 result._blocking = False
277 result.add_done_callback(self._wakeup)
278 self._fut_waiter = result
279 if self._must_cancel:
280 if self._fut_waiter.cancel():
281 self._must_cancel = False
282 else:
283 self._loop.call_soon(
284 self._step, None,
285 RuntimeError(
286 'yield was used instead of yield from '
287 'in task {!r} with {!r}'.format(self, result)))
288 elif result is None:
289 # Bare yield relinquishes control for one event loop iteration.
290 self._loop.call_soon(self._step)
291 elif inspect.isgenerator(result):
292 # Yielding a generator is just wrong.
293 self._loop.call_soon(
294 self._step, None,
295 RuntimeError(
296 'yield was used instead of yield from for '
297 'generator in task {!r} with {}'.format(
298 self, result)))
299 else:
300 # Yielding something else is an error.
301 self._loop.call_soon(
302 self._step, None,
303 RuntimeError(
304 'Task got bad yield: {!r}'.format(result)))
305 self = None
306
307 def _wakeup(self, future):
308 try:
309 value = future.result()
310 except Exception as exc:
311 # This may also be a cancellation.
312 self._step(None, exc)
313 else:
314 self._step(value, None)
315 self = None # Needed to break cycles when an exception occurs.
316
317
318# wait() and as_completed() similar to those in PEP 3148.
319
320FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
321FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
322ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
323
324
325@coroutine
326def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
327 """Wait for the Futures and coroutines given by fs to complete.
328
329 Coroutines will be wrapped in Tasks.
330
331 Returns two sets of Future: (done, pending).
332
333 Usage:
334
335 done, pending = yield from asyncio.wait(fs)
336
337 Note: This does not raise TimeoutError! Futures that aren't done
338 when the timeout occurs are returned in the second set.
339 """
340 if not fs:
341 raise ValueError('Set of coroutines/Futures is empty.')
342
343 if loop is None:
344 loop = events.get_event_loop()
345
346 fs = set(async(f, loop=loop) for f in fs)
347
348 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
349 raise ValueError('Invalid return_when value: {}'.format(return_when))
350 return (yield from _wait(fs, timeout, return_when, loop))
351
352
353def _release_waiter(waiter, value=True, *args):
354 if not waiter.done():
355 waiter.set_result(value)
356
357
358@coroutine
359def wait_for(fut, timeout, *, loop=None):
360 """Wait for the single Future or coroutine to complete, with timeout.
361
362 Coroutine will be wrapped in Task.
363
364 Returns result of the Future or coroutine. Raises TimeoutError when
365 timeout occurs.
366
367 Usage:
368
369 result = yield from asyncio.wait_for(fut, 10.0)
370
371 """
372 if loop is None:
373 loop = events.get_event_loop()
374
375 waiter = futures.Future(loop=loop)
376 timeout_handle = loop.call_later(timeout, _release_waiter, waiter, False)
377 cb = functools.partial(_release_waiter, waiter, True)
378
379 fut = async(fut, loop=loop)
380 fut.add_done_callback(cb)
381
382 try:
383 if (yield from waiter):
384 return fut.result()
385 else:
386 fut.remove_done_callback(cb)
387 raise futures.TimeoutError()
388 finally:
389 timeout_handle.cancel()
390
391
392@coroutine
393def _wait(fs, timeout, return_when, loop):
394 """Internal helper for wait() and _wait_for().
395
396 The fs argument must be a collection of Futures.
397 """
398 assert fs, 'Set of Futures is empty.'
399 waiter = futures.Future(loop=loop)
400 timeout_handle = None
401 if timeout is not None:
402 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
403 counter = len(fs)
404
405 def _on_completion(f):
406 nonlocal counter
407 counter -= 1
408 if (counter <= 0 or
409 return_when == FIRST_COMPLETED or
410 return_when == FIRST_EXCEPTION and (not f.cancelled() and
411 f.exception() is not None)):
412 if timeout_handle is not None:
413 timeout_handle.cancel()
414 if not waiter.done():
415 waiter.set_result(False)
416
417 for f in fs:
418 f.add_done_callback(_on_completion)
419
420 try:
421 yield from waiter
422 finally:
423 if timeout_handle is not None:
424 timeout_handle.cancel()
425
426 done, pending = set(), set()
427 for f in fs:
428 f.remove_done_callback(_on_completion)
429 if f.done():
430 done.add(f)
431 else:
432 pending.add(f)
433 return done, pending
434
435
436# This is *not* a @coroutine! It is just an iterator (yielding Futures).
437def as_completed(fs, *, loop=None, timeout=None):
438 """Return an iterator whose values, when waited for, are Futures.
439
440 This differs from PEP 3148; the proper way to use this is:
441
442 for f in as_completed(fs):
443 result = yield from f # The 'yield from' may raise.
444 # Use result.
445
446 Raises TimeoutError if the timeout occurs before all Futures are
447 done.
448
449 Note: The futures 'f' are not necessarily members of fs.
450 """
451 loop = loop if loop is not None else events.get_event_loop()
452 deadline = None if timeout is None else loop.time() + timeout
453 todo = set(async(f, loop=loop) for f in fs)
454 completed = collections.deque()
455
456 @coroutine
457 def _wait_for_one():
458 while not completed:
459 timeout = None
460 if deadline is not None:
461 timeout = deadline - loop.time()
462 if timeout < 0:
463 raise futures.TimeoutError()
464 done, pending = yield from _wait(
465 todo, timeout, FIRST_COMPLETED, loop)
466 # Multiple callers might be waiting for the same events
467 # and getting the same outcome. Dedupe by updating todo.
468 for f in done:
469 if f in todo:
470 todo.remove(f)
471 completed.append(f)
472 f = completed.popleft()
473 return f.result() # May raise.
474
475 for _ in range(len(todo)):
476 yield _wait_for_one()
477
478
479@coroutine
480def sleep(delay, result=None, *, loop=None):
481 """Coroutine that completes after a given time (in seconds)."""
482 future = futures.Future(loop=loop)
483 h = future._loop.call_later(delay, future.set_result, result)
484 try:
485 return (yield from future)
486 finally:
487 h.cancel()
488
489
490def async(coro_or_future, *, loop=None):
491 """Wrap a coroutine in a future.
492
493 If the argument is a Future, it is returned directly.
494 """
495 if isinstance(coro_or_future, futures.Future):
496 if loop is not None and loop is not coro_or_future._loop:
497 raise ValueError('loop argument must agree with Future')
498 return coro_or_future
499 elif iscoroutine(coro_or_future):
500 return Task(coro_or_future, loop=loop)
501 else:
502 raise TypeError('A Future or coroutine is required')
503
504
505class _GatheringFuture(futures.Future):
506 """Helper for gather().
507
508 This overrides cancel() to cancel all the children and act more
509 like Task.cancel(), which doesn't immediately mark itself as
510 cancelled.
511 """
512
513 def __init__(self, children, *, loop=None):
514 super().__init__(loop=loop)
515 self._children = children
516
517 def cancel(self):
518 if self.done():
519 return False
520 for child in self._children:
521 child.cancel()
522 return True
523
524
525def gather(*coros_or_futures, loop=None, return_exceptions=False):
526 """Return a future aggregating results from the given coroutines
527 or futures.
528
529 All futures must share the same event loop. If all the tasks are
530 done successfully, the returned future's result is the list of
531 results (in the order of the original sequence, not necessarily
532 the order of results arrival). If *result_exception* is True,
533 exceptions in the tasks are treated the same as successful
534 results, and gathered in the result list; otherwise, the first
535 raised exception will be immediately propagated to the returned
536 future.
537
538 Cancellation: if the outer Future is cancelled, all children (that
539 have not completed yet) are also cancelled. If any child is
540 cancelled, this is treated as if it raised CancelledError --
541 the outer Future is *not* cancelled in this case. (This is to
542 prevent the cancellation of one child to cause other children to
543 be cancelled.)
544 """
545 children = [async(fut, loop=loop) for fut in coros_or_futures]
546 n = len(children)
547 if n == 0:
548 outer = futures.Future(loop=loop)
549 outer.set_result([])
550 return outer
551 if loop is None:
552 loop = children[0]._loop
553 for fut in children:
554 if fut._loop is not loop:
555 raise ValueError("futures are tied to different event loops")
556 outer = _GatheringFuture(children, loop=loop)
557 nfinished = 0
558 results = [None] * n
559
560 def _done_callback(i, fut):
561 nonlocal nfinished
562 if outer._state != futures._PENDING:
563 if fut._exception is not None:
564 # Mark exception retrieved.
565 fut.exception()
566 return
567 if fut._state == futures._CANCELLED:
568 res = futures.CancelledError()
569 if not return_exceptions:
570 outer.set_exception(res)
571 return
572 elif fut._exception is not None:
573 res = fut.exception() # Mark exception retrieved.
574 if not return_exceptions:
575 outer.set_exception(res)
576 return
577 else:
578 res = fut._result
579 results[i] = res
580 nfinished += 1
581 if nfinished == n:
582 outer.set_result(results)
583
584 for i, fut in enumerate(children):
585 fut.add_done_callback(functools.partial(_done_callback, i))
586 return outer
587
588
589def shield(arg, *, loop=None):
590 """Wait for a future, shielding it from cancellation.
591
592 The statement
593
594 res = yield from shield(something())
595
596 is exactly equivalent to the statement
597
598 res = yield from something()
599
600 *except* that if the coroutine containing it is cancelled, the
601 task running in something() is not cancelled. From the POV of
602 something(), the cancellation did not happen. But its caller is
603 still cancelled, so the yield-from expression still raises
604 CancelledError. Note: If something() is cancelled by other means
605 this will still cancel shield().
606
607 If you want to completely ignore cancellation (not recommended)
608 you can combine shield() with a try/except clause, as follows:
609
610 try:
611 res = yield from shield(something())
612 except CancelledError:
613 res = None
614 """
615 inner = async(arg, loop=loop)
616 if inner.done():
617 # Shortcut.
618 return inner
619 loop = inner._loop
620 outer = futures.Future(loop=loop)
621
622 def _done_callback(inner):
623 if outer.cancelled():
624 # Mark inner's result as retrieved.
625 inner.cancelled() or inner.exception()
626 return
627 if inner.cancelled():
628 outer.cancel()
629 else:
630 exc = inner.exception()
631 if exc is not None:
632 outer.set_exception(exc)
633 else:
634 outer.set_result(inner.result())
635
636 inner.add_done_callback(_done_callback)
637 return outer