blob: b7ee758d64081b28c8c6c8bc2dcd9cc85770f3d0 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
3__all__ = ['coroutine', 'Task',
Guido van Rossum68816ef2013-12-28 08:06:40 -10004 'iscoroutinefunction', 'iscoroutine',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
6 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
Guido van Rossumde3a1362013-11-29 09:29:00 -08007 'gather', 'shield',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008 ]
9
10import collections
11import concurrent.futures
12import functools
13import inspect
14import linecache
15import traceback
16import weakref
17
18from . import events
19from . import futures
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070020from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021
22# If you set _DEBUG to true, @coroutine will wrap the resulting
23# generator objects in a CoroWrapper instance (defined below). That
24# instance will log a message when the generator is never iterated
25# over, which may happen when you forget to use "yield from" with a
26# coroutine call. Note that the value of the _DEBUG flag is taken
27# when the decorator is used, so to be of any use it must be set
28# before you define your coroutines. A downside of using this feature
29# is that tracebacks show entries for the CoroWrapper.__next__ method
30# when _DEBUG is true.
31_DEBUG = False
32
33
34class CoroWrapper:
Guido van Rossume1f55442014-01-16 11:05:23 -080035 # Wrapper for coroutine in _DEBUG mode.
36
Victor Stinnerbac77932014-01-16 01:55:29 +010037 __slots__ = ['gen', 'func', '__name__', '__doc__']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038
39 def __init__(self, gen, func):
40 assert inspect.isgenerator(gen), gen
41 self.gen = gen
42 self.func = func
43
44 def __iter__(self):
45 return self
46
47 def __next__(self):
48 return next(self.gen)
49
50 def send(self, value):
51 return self.gen.send(value)
52
53 def throw(self, exc):
54 return self.gen.throw(exc)
55
56 def close(self):
57 return self.gen.close()
58
59 def __del__(self):
60 frame = self.gen.gi_frame
61 if frame is not None and frame.f_lasti == -1:
62 func = self.func
63 code = func.__code__
64 filename = code.co_filename
65 lineno = code.co_firstlineno
Guido van Rossum2b430b82013-11-01 14:13:30 -070066 logger.error(
67 'Coroutine %r defined at %s:%s was never yielded from',
68 func.__name__, filename, lineno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070069
70
71def coroutine(func):
72 """Decorator to mark coroutines.
73
74 If the coroutine is not yielded from before it is destroyed,
75 an error message is logged.
76 """
77 if inspect.isgeneratorfunction(func):
78 coro = func
79 else:
80 @functools.wraps(func)
81 def coro(*args, **kw):
82 res = func(*args, **kw)
83 if isinstance(res, futures.Future) or inspect.isgenerator(res):
84 res = yield from res
85 return res
86
87 if not _DEBUG:
88 wrapper = coro
89 else:
90 @functools.wraps(func)
91 def wrapper(*args, **kwds):
92 w = CoroWrapper(coro(*args, **kwds), func)
93 w.__name__ = coro.__name__
94 w.__doc__ = coro.__doc__
95 return w
96
97 wrapper._is_coroutine = True # For iscoroutinefunction().
98 return wrapper
99
100
101def iscoroutinefunction(func):
102 """Return True if func is a decorated coroutine function."""
103 return getattr(func, '_is_coroutine', False)
104
105
106def iscoroutine(obj):
107 """Return True if obj is a coroutine object."""
108 return isinstance(obj, CoroWrapper) or inspect.isgenerator(obj)
109
110
111class Task(futures.Future):
112 """A coroutine wrapped in a Future."""
113
114 # An important invariant maintained while a Task not done:
115 #
116 # - Either _fut_waiter is None, and _step() is scheduled;
117 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
118 #
119 # The only transition from the latter to the former is through
120 # _wakeup(). When _fut_waiter is not None, one of its callbacks
121 # must be _wakeup().
122
123 # Weak set containing all tasks alive.
124 _all_tasks = weakref.WeakSet()
125
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800126 # Dictionary containing tasks that are currently active in
127 # all running event loops. {EventLoop: Task}
128 _current_tasks = {}
129
130 @classmethod
131 def current_task(cls, loop=None):
132 """Return the currently running task in an event loop or None.
133
134 By default the current task for the current event loop is returned.
135
136 None is returned when called not in the context of a Task.
137 """
138 if loop is None:
139 loop = events.get_event_loop()
140 return cls._current_tasks.get(loop)
141
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700142 @classmethod
143 def all_tasks(cls, loop=None):
144 """Return a set of all tasks for an event loop.
145
146 By default all tasks for the current event loop are returned.
147 """
148 if loop is None:
149 loop = events.get_event_loop()
150 return {t for t in cls._all_tasks if t._loop is loop}
151
152 def __init__(self, coro, *, loop=None):
153 assert iscoroutine(coro), repr(coro) # Not a coroutine function!
154 super().__init__(loop=loop)
155 self._coro = iter(coro) # Use the iterator just in case.
156 self._fut_waiter = None
157 self._must_cancel = False
158 self._loop.call_soon(self._step)
159 self.__class__._all_tasks.add(self)
160
161 def __repr__(self):
162 res = super().__repr__()
163 if (self._must_cancel and
164 self._state == futures._PENDING and
165 '<PENDING' in res):
166 res = res.replace('<PENDING', '<CANCELLING', 1)
167 i = res.find('<')
168 if i < 0:
169 i = len(res)
170 res = res[:i] + '(<{}>)'.format(self._coro.__name__) + res[i:]
171 return res
172
173 def get_stack(self, *, limit=None):
174 """Return the list of stack frames for this task's coroutine.
175
176 If the coroutine is active, this returns the stack where it is
177 suspended. If the coroutine has completed successfully or was
178 cancelled, this returns an empty list. If the coroutine was
179 terminated by an exception, this returns the list of traceback
180 frames.
181
182 The frames are always ordered from oldest to newest.
183
184 The optional limit gives the maximum nummber of frames to
185 return; by default all available frames are returned. Its
186 meaning differs depending on whether a stack or a traceback is
187 returned: the newest frames of a stack are returned, but the
188 oldest frames of a traceback are returned. (This matches the
189 behavior of the traceback module.)
190
191 For reasons beyond our control, only one stack frame is
192 returned for a suspended coroutine.
193 """
194 frames = []
195 f = self._coro.gi_frame
196 if f is not None:
197 while f is not None:
198 if limit is not None:
199 if limit <= 0:
200 break
201 limit -= 1
202 frames.append(f)
203 f = f.f_back
204 frames.reverse()
205 elif self._exception is not None:
206 tb = self._exception.__traceback__
207 while tb is not None:
208 if limit is not None:
209 if limit <= 0:
210 break
211 limit -= 1
212 frames.append(tb.tb_frame)
213 tb = tb.tb_next
214 return frames
215
216 def print_stack(self, *, limit=None, file=None):
217 """Print the stack or traceback for this task's coroutine.
218
219 This produces output similar to that of the traceback module,
220 for the frames retrieved by get_stack(). The limit argument
221 is passed to get_stack(). The file argument is an I/O stream
222 to which the output goes; by default it goes to sys.stderr.
223 """
224 extracted_list = []
225 checked = set()
226 for f in self.get_stack(limit=limit):
227 lineno = f.f_lineno
228 co = f.f_code
229 filename = co.co_filename
230 name = co.co_name
231 if filename not in checked:
232 checked.add(filename)
233 linecache.checkcache(filename)
234 line = linecache.getline(filename, lineno, f.f_globals)
235 extracted_list.append((filename, lineno, name, line))
236 exc = self._exception
237 if not extracted_list:
238 print('No stack for %r' % self, file=file)
239 elif exc is not None:
240 print('Traceback for %r (most recent call last):' % self,
241 file=file)
242 else:
243 print('Stack for %r (most recent call last):' % self,
244 file=file)
245 traceback.print_list(extracted_list, file=file)
246 if exc is not None:
247 for line in traceback.format_exception_only(exc.__class__, exc):
248 print(line, file=file, end='')
249
250 def cancel(self):
251 if self.done():
252 return False
253 if self._fut_waiter is not None:
254 if self._fut_waiter.cancel():
255 # Leave self._fut_waiter; it may be a Task that
256 # catches and ignores the cancellation so we may have
257 # to cancel it again later.
258 return True
259 # It must be the case that self._step is already scheduled.
260 self._must_cancel = True
261 return True
262
263 def _step(self, value=None, exc=None):
264 assert not self.done(), \
265 '_step(): already done: {!r}, {!r}, {!r}'.format(self, value, exc)
266 if self._must_cancel:
267 if not isinstance(exc, futures.CancelledError):
268 exc = futures.CancelledError()
269 self._must_cancel = False
270 coro = self._coro
271 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800272
273 self.__class__._current_tasks[self._loop] = self
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700274 # Call either coro.throw(exc) or coro.send(value).
275 try:
276 if exc is not None:
277 result = coro.throw(exc)
278 elif value is not None:
279 result = coro.send(value)
280 else:
281 result = next(coro)
282 except StopIteration as exc:
283 self.set_result(exc.value)
284 except futures.CancelledError as exc:
285 super().cancel() # I.e., Future.cancel(self).
286 except Exception as exc:
287 self.set_exception(exc)
288 except BaseException as exc:
289 self.set_exception(exc)
290 raise
291 else:
292 if isinstance(result, futures.Future):
293 # Yielded Future must come from Future.__iter__().
294 if result._blocking:
295 result._blocking = False
296 result.add_done_callback(self._wakeup)
297 self._fut_waiter = result
298 if self._must_cancel:
299 if self._fut_waiter.cancel():
300 self._must_cancel = False
301 else:
302 self._loop.call_soon(
303 self._step, None,
304 RuntimeError(
305 'yield was used instead of yield from '
306 'in task {!r} with {!r}'.format(self, result)))
307 elif result is None:
308 # Bare yield relinquishes control for one event loop iteration.
309 self._loop.call_soon(self._step)
310 elif inspect.isgenerator(result):
311 # Yielding a generator is just wrong.
312 self._loop.call_soon(
313 self._step, None,
314 RuntimeError(
315 'yield was used instead of yield from for '
316 'generator in task {!r} with {}'.format(
317 self, result)))
318 else:
319 # Yielding something else is an error.
320 self._loop.call_soon(
321 self._step, None,
322 RuntimeError(
323 'Task got bad yield: {!r}'.format(result)))
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800324 finally:
325 self.__class__._current_tasks.pop(self._loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700326 self = None
327
328 def _wakeup(self, future):
329 try:
330 value = future.result()
331 except Exception as exc:
332 # This may also be a cancellation.
333 self._step(None, exc)
334 else:
335 self._step(value, None)
336 self = None # Needed to break cycles when an exception occurs.
337
338
339# wait() and as_completed() similar to those in PEP 3148.
340
341FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
342FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
343ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
344
345
346@coroutine
347def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
348 """Wait for the Futures and coroutines given by fs to complete.
349
350 Coroutines will be wrapped in Tasks.
351
352 Returns two sets of Future: (done, pending).
353
354 Usage:
355
356 done, pending = yield from asyncio.wait(fs)
357
358 Note: This does not raise TimeoutError! Futures that aren't done
359 when the timeout occurs are returned in the second set.
360 """
Victor Stinner208556c2014-02-11 11:54:08 +0100361 if isinstance(fs, futures.Future) or iscoroutine(fs):
362 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700363 if not fs:
364 raise ValueError('Set of coroutines/Futures is empty.')
365
366 if loop is None:
367 loop = events.get_event_loop()
368
Yury Selivanov622be342014-02-06 22:06:16 -0500369 fs = {async(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700370
371 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
372 raise ValueError('Invalid return_when value: {}'.format(return_when))
373 return (yield from _wait(fs, timeout, return_when, loop))
374
375
376def _release_waiter(waiter, value=True, *args):
377 if not waiter.done():
378 waiter.set_result(value)
379
380
381@coroutine
382def wait_for(fut, timeout, *, loop=None):
383 """Wait for the single Future or coroutine to complete, with timeout.
384
385 Coroutine will be wrapped in Task.
386
Victor Stinner421e49b2014-01-23 17:40:59 +0100387 Returns result of the Future or coroutine. When a timeout occurs,
388 it cancels the task and raises TimeoutError. To avoid the task
389 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700390
391 Usage:
392
393 result = yield from asyncio.wait_for(fut, 10.0)
394
395 """
396 if loop is None:
397 loop = events.get_event_loop()
398
Guido van Rossum48c66c32014-01-29 14:30:38 -0800399 if timeout is None:
400 return (yield from fut)
401
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700402 waiter = futures.Future(loop=loop)
403 timeout_handle = loop.call_later(timeout, _release_waiter, waiter, False)
404 cb = functools.partial(_release_waiter, waiter, True)
405
406 fut = async(fut, loop=loop)
407 fut.add_done_callback(cb)
408
409 try:
410 if (yield from waiter):
411 return fut.result()
412 else:
413 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100414 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700415 raise futures.TimeoutError()
416 finally:
417 timeout_handle.cancel()
418
419
420@coroutine
421def _wait(fs, timeout, return_when, loop):
422 """Internal helper for wait() and _wait_for().
423
424 The fs argument must be a collection of Futures.
425 """
426 assert fs, 'Set of Futures is empty.'
427 waiter = futures.Future(loop=loop)
428 timeout_handle = None
429 if timeout is not None:
430 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
431 counter = len(fs)
432
433 def _on_completion(f):
434 nonlocal counter
435 counter -= 1
436 if (counter <= 0 or
437 return_when == FIRST_COMPLETED or
438 return_when == FIRST_EXCEPTION and (not f.cancelled() and
439 f.exception() is not None)):
440 if timeout_handle is not None:
441 timeout_handle.cancel()
442 if not waiter.done():
443 waiter.set_result(False)
444
445 for f in fs:
446 f.add_done_callback(_on_completion)
447
448 try:
449 yield from waiter
450 finally:
451 if timeout_handle is not None:
452 timeout_handle.cancel()
453
454 done, pending = set(), set()
455 for f in fs:
456 f.remove_done_callback(_on_completion)
457 if f.done():
458 done.add(f)
459 else:
460 pending.add(f)
461 return done, pending
462
463
464# This is *not* a @coroutine! It is just an iterator (yielding Futures).
465def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossum2303fec2014-02-12 17:58:19 -0800466 """Return an iterator whose values are coroutines.
467
468 When waiting for the yielded coroutines you'll get the results (or
469 exceptions!) of the original Futures (or coroutines), in the order
470 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700471
472 This differs from PEP 3148; the proper way to use this is:
473
474 for f in as_completed(fs):
475 result = yield from f # The 'yield from' may raise.
476 # Use result.
477
Guido van Rossum2303fec2014-02-12 17:58:19 -0800478 If a timeout is specified, the 'yield from' will raise
479 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700480
481 Note: The futures 'f' are not necessarily members of fs.
482 """
Victor Stinner208556c2014-02-11 11:54:08 +0100483 if isinstance(fs, futures.Future) or iscoroutine(fs):
484 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700485 loop = loop if loop is not None else events.get_event_loop()
486 deadline = None if timeout is None else loop.time() + timeout
Yury Selivanov622be342014-02-06 22:06:16 -0500487 todo = {async(f, loop=loop) for f in set(fs)}
Guido van Rossum2303fec2014-02-12 17:58:19 -0800488 from .queues import Queue # Import here to avoid circular import problem.
489 done = Queue(loop=loop)
490 timeout_handle = None
491
492 def _on_timeout():
493 for f in todo:
494 f.remove_done_callback(_on_completion)
495 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
496 todo.clear() # Can't do todo.remove(f) in the loop.
497
498 def _on_completion(f):
499 if not todo:
500 return # _on_timeout() was here first.
501 todo.remove(f)
502 done.put_nowait(f)
503 if not todo and timeout_handle is not None:
504 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700505
506 @coroutine
507 def _wait_for_one():
Guido van Rossum2303fec2014-02-12 17:58:19 -0800508 f = yield from done.get()
509 if f is None:
510 # Dummy value from _on_timeout().
511 raise futures.TimeoutError
512 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700513
Guido van Rossum2303fec2014-02-12 17:58:19 -0800514 for f in todo:
515 f.add_done_callback(_on_completion)
516 if todo and timeout is not None:
517 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700518 for _ in range(len(todo)):
519 yield _wait_for_one()
520
521
522@coroutine
523def sleep(delay, result=None, *, loop=None):
524 """Coroutine that completes after a given time (in seconds)."""
525 future = futures.Future(loop=loop)
526 h = future._loop.call_later(delay, future.set_result, result)
527 try:
528 return (yield from future)
529 finally:
530 h.cancel()
531
532
533def async(coro_or_future, *, loop=None):
534 """Wrap a coroutine in a future.
535
536 If the argument is a Future, it is returned directly.
537 """
538 if isinstance(coro_or_future, futures.Future):
539 if loop is not None and loop is not coro_or_future._loop:
540 raise ValueError('loop argument must agree with Future')
541 return coro_or_future
542 elif iscoroutine(coro_or_future):
543 return Task(coro_or_future, loop=loop)
544 else:
545 raise TypeError('A Future or coroutine is required')
546
547
548class _GatheringFuture(futures.Future):
549 """Helper for gather().
550
551 This overrides cancel() to cancel all the children and act more
552 like Task.cancel(), which doesn't immediately mark itself as
553 cancelled.
554 """
555
556 def __init__(self, children, *, loop=None):
557 super().__init__(loop=loop)
558 self._children = children
559
560 def cancel(self):
561 if self.done():
562 return False
563 for child in self._children:
564 child.cancel()
565 return True
566
567
568def gather(*coros_or_futures, loop=None, return_exceptions=False):
569 """Return a future aggregating results from the given coroutines
570 or futures.
571
572 All futures must share the same event loop. If all the tasks are
573 done successfully, the returned future's result is the list of
574 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500575 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700576 exceptions in the tasks are treated the same as successful
577 results, and gathered in the result list; otherwise, the first
578 raised exception will be immediately propagated to the returned
579 future.
580
581 Cancellation: if the outer Future is cancelled, all children (that
582 have not completed yet) are also cancelled. If any child is
583 cancelled, this is treated as if it raised CancelledError --
584 the outer Future is *not* cancelled in this case. (This is to
585 prevent the cancellation of one child to cause other children to
586 be cancelled.)
587 """
Yury Selivanov622be342014-02-06 22:06:16 -0500588 arg_to_fut = {arg: async(arg, loop=loop) for arg in set(coros_or_futures)}
589 children = [arg_to_fut[arg] for arg in coros_or_futures]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700590 n = len(children)
591 if n == 0:
592 outer = futures.Future(loop=loop)
593 outer.set_result([])
594 return outer
595 if loop is None:
596 loop = children[0]._loop
597 for fut in children:
598 if fut._loop is not loop:
599 raise ValueError("futures are tied to different event loops")
600 outer = _GatheringFuture(children, loop=loop)
601 nfinished = 0
602 results = [None] * n
603
604 def _done_callback(i, fut):
605 nonlocal nfinished
606 if outer._state != futures._PENDING:
607 if fut._exception is not None:
608 # Mark exception retrieved.
609 fut.exception()
610 return
611 if fut._state == futures._CANCELLED:
612 res = futures.CancelledError()
613 if not return_exceptions:
614 outer.set_exception(res)
615 return
616 elif fut._exception is not None:
617 res = fut.exception() # Mark exception retrieved.
618 if not return_exceptions:
619 outer.set_exception(res)
620 return
621 else:
622 res = fut._result
623 results[i] = res
624 nfinished += 1
625 if nfinished == n:
626 outer.set_result(results)
627
628 for i, fut in enumerate(children):
629 fut.add_done_callback(functools.partial(_done_callback, i))
630 return outer
631
632
633def shield(arg, *, loop=None):
634 """Wait for a future, shielding it from cancellation.
635
636 The statement
637
638 res = yield from shield(something())
639
640 is exactly equivalent to the statement
641
642 res = yield from something()
643
644 *except* that if the coroutine containing it is cancelled, the
645 task running in something() is not cancelled. From the POV of
646 something(), the cancellation did not happen. But its caller is
647 still cancelled, so the yield-from expression still raises
648 CancelledError. Note: If something() is cancelled by other means
649 this will still cancel shield().
650
651 If you want to completely ignore cancellation (not recommended)
652 you can combine shield() with a try/except clause, as follows:
653
654 try:
655 res = yield from shield(something())
656 except CancelledError:
657 res = None
658 """
659 inner = async(arg, loop=loop)
660 if inner.done():
661 # Shortcut.
662 return inner
663 loop = inner._loop
664 outer = futures.Future(loop=loop)
665
666 def _done_callback(inner):
667 if outer.cancelled():
668 # Mark inner's result as retrieved.
669 inner.cancelled() or inner.exception()
670 return
671 if inner.cancelled():
672 outer.cancel()
673 else:
674 exc = inner.exception()
675 if exc is not None:
676 outer.set_exception(exc)
677 else:
678 outer.set_result(inner.result())
679
680 inner.add_done_callback(_done_callback)
681 return outer