blob: 81a125f44d77f96d5dc9d063011bbb376012cbf6 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
3__all__ = ['coroutine', 'Task',
Guido van Rossum68816ef2013-12-28 08:06:40 -10004 'iscoroutinefunction', 'iscoroutine',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
6 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
Guido van Rossumde3a1362013-11-29 09:29:00 -08007 'gather', 'shield',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008 ]
9
10import collections
11import concurrent.futures
12import functools
13import inspect
14import linecache
15import traceback
16import weakref
17
18from . import events
19from . import futures
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070020from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021
22# If you set _DEBUG to true, @coroutine will wrap the resulting
23# generator objects in a CoroWrapper instance (defined below). That
24# instance will log a message when the generator is never iterated
25# over, which may happen when you forget to use "yield from" with a
26# coroutine call. Note that the value of the _DEBUG flag is taken
27# when the decorator is used, so to be of any use it must be set
28# before you define your coroutines. A downside of using this feature
29# is that tracebacks show entries for the CoroWrapper.__next__ method
30# when _DEBUG is true.
31_DEBUG = False
32
33
34class CoroWrapper:
Guido van Rossume1f55442014-01-16 11:05:23 -080035 # Wrapper for coroutine in _DEBUG mode.
36
Victor Stinnerbac77932014-01-16 01:55:29 +010037 __slots__ = ['gen', 'func', '__name__', '__doc__']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038
39 def __init__(self, gen, func):
40 assert inspect.isgenerator(gen), gen
41 self.gen = gen
42 self.func = func
43
44 def __iter__(self):
45 return self
46
47 def __next__(self):
48 return next(self.gen)
49
50 def send(self, value):
51 return self.gen.send(value)
52
53 def throw(self, exc):
54 return self.gen.throw(exc)
55
56 def close(self):
57 return self.gen.close()
58
59 def __del__(self):
60 frame = self.gen.gi_frame
61 if frame is not None and frame.f_lasti == -1:
62 func = self.func
63 code = func.__code__
64 filename = code.co_filename
65 lineno = code.co_firstlineno
Guido van Rossum2b430b82013-11-01 14:13:30 -070066 logger.error(
67 'Coroutine %r defined at %s:%s was never yielded from',
68 func.__name__, filename, lineno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070069
70
71def coroutine(func):
72 """Decorator to mark coroutines.
73
74 If the coroutine is not yielded from before it is destroyed,
75 an error message is logged.
76 """
77 if inspect.isgeneratorfunction(func):
78 coro = func
79 else:
80 @functools.wraps(func)
81 def coro(*args, **kw):
82 res = func(*args, **kw)
83 if isinstance(res, futures.Future) or inspect.isgenerator(res):
84 res = yield from res
85 return res
86
87 if not _DEBUG:
88 wrapper = coro
89 else:
90 @functools.wraps(func)
91 def wrapper(*args, **kwds):
92 w = CoroWrapper(coro(*args, **kwds), func)
93 w.__name__ = coro.__name__
94 w.__doc__ = coro.__doc__
95 return w
96
97 wrapper._is_coroutine = True # For iscoroutinefunction().
98 return wrapper
99
100
101def iscoroutinefunction(func):
102 """Return True if func is a decorated coroutine function."""
103 return getattr(func, '_is_coroutine', False)
104
105
106def iscoroutine(obj):
107 """Return True if obj is a coroutine object."""
108 return isinstance(obj, CoroWrapper) or inspect.isgenerator(obj)
109
110
111class Task(futures.Future):
112 """A coroutine wrapped in a Future."""
113
114 # An important invariant maintained while a Task not done:
115 #
116 # - Either _fut_waiter is None, and _step() is scheduled;
117 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
118 #
119 # The only transition from the latter to the former is through
120 # _wakeup(). When _fut_waiter is not None, one of its callbacks
121 # must be _wakeup().
122
123 # Weak set containing all tasks alive.
124 _all_tasks = weakref.WeakSet()
125
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800126 # Dictionary containing tasks that are currently active in
127 # all running event loops. {EventLoop: Task}
128 _current_tasks = {}
129
130 @classmethod
131 def current_task(cls, loop=None):
132 """Return the currently running task in an event loop or None.
133
134 By default the current task for the current event loop is returned.
135
136 None is returned when called not in the context of a Task.
137 """
138 if loop is None:
139 loop = events.get_event_loop()
140 return cls._current_tasks.get(loop)
141
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700142 @classmethod
143 def all_tasks(cls, loop=None):
144 """Return a set of all tasks for an event loop.
145
146 By default all tasks for the current event loop are returned.
147 """
148 if loop is None:
149 loop = events.get_event_loop()
150 return {t for t in cls._all_tasks if t._loop is loop}
151
152 def __init__(self, coro, *, loop=None):
153 assert iscoroutine(coro), repr(coro) # Not a coroutine function!
154 super().__init__(loop=loop)
155 self._coro = iter(coro) # Use the iterator just in case.
156 self._fut_waiter = None
157 self._must_cancel = False
158 self._loop.call_soon(self._step)
159 self.__class__._all_tasks.add(self)
160
161 def __repr__(self):
162 res = super().__repr__()
163 if (self._must_cancel and
164 self._state == futures._PENDING and
165 '<PENDING' in res):
166 res = res.replace('<PENDING', '<CANCELLING', 1)
167 i = res.find('<')
168 if i < 0:
169 i = len(res)
170 res = res[:i] + '(<{}>)'.format(self._coro.__name__) + res[i:]
171 return res
172
173 def get_stack(self, *, limit=None):
174 """Return the list of stack frames for this task's coroutine.
175
176 If the coroutine is active, this returns the stack where it is
177 suspended. If the coroutine has completed successfully or was
178 cancelled, this returns an empty list. If the coroutine was
179 terminated by an exception, this returns the list of traceback
180 frames.
181
182 The frames are always ordered from oldest to newest.
183
184 The optional limit gives the maximum nummber of frames to
185 return; by default all available frames are returned. Its
186 meaning differs depending on whether a stack or a traceback is
187 returned: the newest frames of a stack are returned, but the
188 oldest frames of a traceback are returned. (This matches the
189 behavior of the traceback module.)
190
191 For reasons beyond our control, only one stack frame is
192 returned for a suspended coroutine.
193 """
194 frames = []
195 f = self._coro.gi_frame
196 if f is not None:
197 while f is not None:
198 if limit is not None:
199 if limit <= 0:
200 break
201 limit -= 1
202 frames.append(f)
203 f = f.f_back
204 frames.reverse()
205 elif self._exception is not None:
206 tb = self._exception.__traceback__
207 while tb is not None:
208 if limit is not None:
209 if limit <= 0:
210 break
211 limit -= 1
212 frames.append(tb.tb_frame)
213 tb = tb.tb_next
214 return frames
215
216 def print_stack(self, *, limit=None, file=None):
217 """Print the stack or traceback for this task's coroutine.
218
219 This produces output similar to that of the traceback module,
220 for the frames retrieved by get_stack(). The limit argument
221 is passed to get_stack(). The file argument is an I/O stream
222 to which the output goes; by default it goes to sys.stderr.
223 """
224 extracted_list = []
225 checked = set()
226 for f in self.get_stack(limit=limit):
227 lineno = f.f_lineno
228 co = f.f_code
229 filename = co.co_filename
230 name = co.co_name
231 if filename not in checked:
232 checked.add(filename)
233 linecache.checkcache(filename)
234 line = linecache.getline(filename, lineno, f.f_globals)
235 extracted_list.append((filename, lineno, name, line))
236 exc = self._exception
237 if not extracted_list:
238 print('No stack for %r' % self, file=file)
239 elif exc is not None:
240 print('Traceback for %r (most recent call last):' % self,
241 file=file)
242 else:
243 print('Stack for %r (most recent call last):' % self,
244 file=file)
245 traceback.print_list(extracted_list, file=file)
246 if exc is not None:
247 for line in traceback.format_exception_only(exc.__class__, exc):
248 print(line, file=file, end='')
249
250 def cancel(self):
251 if self.done():
252 return False
253 if self._fut_waiter is not None:
254 if self._fut_waiter.cancel():
255 # Leave self._fut_waiter; it may be a Task that
256 # catches and ignores the cancellation so we may have
257 # to cancel it again later.
258 return True
259 # It must be the case that self._step is already scheduled.
260 self._must_cancel = True
261 return True
262
263 def _step(self, value=None, exc=None):
264 assert not self.done(), \
265 '_step(): already done: {!r}, {!r}, {!r}'.format(self, value, exc)
266 if self._must_cancel:
267 if not isinstance(exc, futures.CancelledError):
268 exc = futures.CancelledError()
269 self._must_cancel = False
270 coro = self._coro
271 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800272
273 self.__class__._current_tasks[self._loop] = self
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700274 # Call either coro.throw(exc) or coro.send(value).
275 try:
276 if exc is not None:
277 result = coro.throw(exc)
278 elif value is not None:
279 result = coro.send(value)
280 else:
281 result = next(coro)
282 except StopIteration as exc:
283 self.set_result(exc.value)
284 except futures.CancelledError as exc:
285 super().cancel() # I.e., Future.cancel(self).
286 except Exception as exc:
287 self.set_exception(exc)
288 except BaseException as exc:
289 self.set_exception(exc)
290 raise
291 else:
292 if isinstance(result, futures.Future):
293 # Yielded Future must come from Future.__iter__().
294 if result._blocking:
295 result._blocking = False
296 result.add_done_callback(self._wakeup)
297 self._fut_waiter = result
298 if self._must_cancel:
299 if self._fut_waiter.cancel():
300 self._must_cancel = False
301 else:
302 self._loop.call_soon(
303 self._step, None,
304 RuntimeError(
305 'yield was used instead of yield from '
306 'in task {!r} with {!r}'.format(self, result)))
307 elif result is None:
308 # Bare yield relinquishes control for one event loop iteration.
309 self._loop.call_soon(self._step)
310 elif inspect.isgenerator(result):
311 # Yielding a generator is just wrong.
312 self._loop.call_soon(
313 self._step, None,
314 RuntimeError(
315 'yield was used instead of yield from for '
316 'generator in task {!r} with {}'.format(
317 self, result)))
318 else:
319 # Yielding something else is an error.
320 self._loop.call_soon(
321 self._step, None,
322 RuntimeError(
323 'Task got bad yield: {!r}'.format(result)))
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800324 finally:
325 self.__class__._current_tasks.pop(self._loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700326 self = None
327
328 def _wakeup(self, future):
329 try:
330 value = future.result()
331 except Exception as exc:
332 # This may also be a cancellation.
333 self._step(None, exc)
334 else:
335 self._step(value, None)
336 self = None # Needed to break cycles when an exception occurs.
337
338
339# wait() and as_completed() similar to those in PEP 3148.
340
341FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
342FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
343ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
344
345
346@coroutine
347def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
348 """Wait for the Futures and coroutines given by fs to complete.
349
350 Coroutines will be wrapped in Tasks.
351
352 Returns two sets of Future: (done, pending).
353
354 Usage:
355
356 done, pending = yield from asyncio.wait(fs)
357
358 Note: This does not raise TimeoutError! Futures that aren't done
359 when the timeout occurs are returned in the second set.
360 """
Victor Stinner208556c2014-02-11 11:54:08 +0100361 if isinstance(fs, futures.Future) or iscoroutine(fs):
362 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700363 if not fs:
364 raise ValueError('Set of coroutines/Futures is empty.')
365
366 if loop is None:
367 loop = events.get_event_loop()
368
Yury Selivanov622be342014-02-06 22:06:16 -0500369 fs = {async(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700370
371 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
372 raise ValueError('Invalid return_when value: {}'.format(return_when))
373 return (yield from _wait(fs, timeout, return_when, loop))
374
375
376def _release_waiter(waiter, value=True, *args):
377 if not waiter.done():
378 waiter.set_result(value)
379
380
381@coroutine
382def wait_for(fut, timeout, *, loop=None):
383 """Wait for the single Future or coroutine to complete, with timeout.
384
385 Coroutine will be wrapped in Task.
386
Victor Stinner421e49b2014-01-23 17:40:59 +0100387 Returns result of the Future or coroutine. When a timeout occurs,
388 it cancels the task and raises TimeoutError. To avoid the task
389 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700390
391 Usage:
392
393 result = yield from asyncio.wait_for(fut, 10.0)
394
395 """
396 if loop is None:
397 loop = events.get_event_loop()
398
Guido van Rossum48c66c32014-01-29 14:30:38 -0800399 if timeout is None:
400 return (yield from fut)
401
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700402 waiter = futures.Future(loop=loop)
403 timeout_handle = loop.call_later(timeout, _release_waiter, waiter, False)
404 cb = functools.partial(_release_waiter, waiter, True)
405
406 fut = async(fut, loop=loop)
407 fut.add_done_callback(cb)
408
409 try:
410 if (yield from waiter):
411 return fut.result()
412 else:
413 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100414 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700415 raise futures.TimeoutError()
416 finally:
417 timeout_handle.cancel()
418
419
420@coroutine
421def _wait(fs, timeout, return_when, loop):
422 """Internal helper for wait() and _wait_for().
423
424 The fs argument must be a collection of Futures.
425 """
426 assert fs, 'Set of Futures is empty.'
427 waiter = futures.Future(loop=loop)
428 timeout_handle = None
429 if timeout is not None:
430 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
431 counter = len(fs)
432
433 def _on_completion(f):
434 nonlocal counter
435 counter -= 1
436 if (counter <= 0 or
437 return_when == FIRST_COMPLETED or
438 return_when == FIRST_EXCEPTION and (not f.cancelled() and
439 f.exception() is not None)):
440 if timeout_handle is not None:
441 timeout_handle.cancel()
442 if not waiter.done():
443 waiter.set_result(False)
444
445 for f in fs:
446 f.add_done_callback(_on_completion)
447
448 try:
449 yield from waiter
450 finally:
451 if timeout_handle is not None:
452 timeout_handle.cancel()
453
454 done, pending = set(), set()
455 for f in fs:
456 f.remove_done_callback(_on_completion)
457 if f.done():
458 done.add(f)
459 else:
460 pending.add(f)
461 return done, pending
462
463
464# This is *not* a @coroutine! It is just an iterator (yielding Futures).
465def as_completed(fs, *, loop=None, timeout=None):
466 """Return an iterator whose values, when waited for, are Futures.
467
468 This differs from PEP 3148; the proper way to use this is:
469
470 for f in as_completed(fs):
471 result = yield from f # The 'yield from' may raise.
472 # Use result.
473
474 Raises TimeoutError if the timeout occurs before all Futures are
475 done.
476
477 Note: The futures 'f' are not necessarily members of fs.
478 """
Victor Stinner208556c2014-02-11 11:54:08 +0100479 if isinstance(fs, futures.Future) or iscoroutine(fs):
480 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700481 loop = loop if loop is not None else events.get_event_loop()
482 deadline = None if timeout is None else loop.time() + timeout
Yury Selivanov622be342014-02-06 22:06:16 -0500483 todo = {async(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700484 completed = collections.deque()
485
486 @coroutine
487 def _wait_for_one():
488 while not completed:
489 timeout = None
490 if deadline is not None:
491 timeout = deadline - loop.time()
492 if timeout < 0:
493 raise futures.TimeoutError()
494 done, pending = yield from _wait(
495 todo, timeout, FIRST_COMPLETED, loop)
496 # Multiple callers might be waiting for the same events
497 # and getting the same outcome. Dedupe by updating todo.
498 for f in done:
499 if f in todo:
500 todo.remove(f)
501 completed.append(f)
502 f = completed.popleft()
503 return f.result() # May raise.
504
505 for _ in range(len(todo)):
506 yield _wait_for_one()
507
508
509@coroutine
510def sleep(delay, result=None, *, loop=None):
511 """Coroutine that completes after a given time (in seconds)."""
512 future = futures.Future(loop=loop)
513 h = future._loop.call_later(delay, future.set_result, result)
514 try:
515 return (yield from future)
516 finally:
517 h.cancel()
518
519
520def async(coro_or_future, *, loop=None):
521 """Wrap a coroutine in a future.
522
523 If the argument is a Future, it is returned directly.
524 """
525 if isinstance(coro_or_future, futures.Future):
526 if loop is not None and loop is not coro_or_future._loop:
527 raise ValueError('loop argument must agree with Future')
528 return coro_or_future
529 elif iscoroutine(coro_or_future):
530 return Task(coro_or_future, loop=loop)
531 else:
532 raise TypeError('A Future or coroutine is required')
533
534
535class _GatheringFuture(futures.Future):
536 """Helper for gather().
537
538 This overrides cancel() to cancel all the children and act more
539 like Task.cancel(), which doesn't immediately mark itself as
540 cancelled.
541 """
542
543 def __init__(self, children, *, loop=None):
544 super().__init__(loop=loop)
545 self._children = children
546
547 def cancel(self):
548 if self.done():
549 return False
550 for child in self._children:
551 child.cancel()
552 return True
553
554
555def gather(*coros_or_futures, loop=None, return_exceptions=False):
556 """Return a future aggregating results from the given coroutines
557 or futures.
558
559 All futures must share the same event loop. If all the tasks are
560 done successfully, the returned future's result is the list of
561 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500562 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700563 exceptions in the tasks are treated the same as successful
564 results, and gathered in the result list; otherwise, the first
565 raised exception will be immediately propagated to the returned
566 future.
567
568 Cancellation: if the outer Future is cancelled, all children (that
569 have not completed yet) are also cancelled. If any child is
570 cancelled, this is treated as if it raised CancelledError --
571 the outer Future is *not* cancelled in this case. (This is to
572 prevent the cancellation of one child to cause other children to
573 be cancelled.)
574 """
Yury Selivanov622be342014-02-06 22:06:16 -0500575 arg_to_fut = {arg: async(arg, loop=loop) for arg in set(coros_or_futures)}
576 children = [arg_to_fut[arg] for arg in coros_or_futures]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700577 n = len(children)
578 if n == 0:
579 outer = futures.Future(loop=loop)
580 outer.set_result([])
581 return outer
582 if loop is None:
583 loop = children[0]._loop
584 for fut in children:
585 if fut._loop is not loop:
586 raise ValueError("futures are tied to different event loops")
587 outer = _GatheringFuture(children, loop=loop)
588 nfinished = 0
589 results = [None] * n
590
591 def _done_callback(i, fut):
592 nonlocal nfinished
593 if outer._state != futures._PENDING:
594 if fut._exception is not None:
595 # Mark exception retrieved.
596 fut.exception()
597 return
598 if fut._state == futures._CANCELLED:
599 res = futures.CancelledError()
600 if not return_exceptions:
601 outer.set_exception(res)
602 return
603 elif fut._exception is not None:
604 res = fut.exception() # Mark exception retrieved.
605 if not return_exceptions:
606 outer.set_exception(res)
607 return
608 else:
609 res = fut._result
610 results[i] = res
611 nfinished += 1
612 if nfinished == n:
613 outer.set_result(results)
614
615 for i, fut in enumerate(children):
616 fut.add_done_callback(functools.partial(_done_callback, i))
617 return outer
618
619
620def shield(arg, *, loop=None):
621 """Wait for a future, shielding it from cancellation.
622
623 The statement
624
625 res = yield from shield(something())
626
627 is exactly equivalent to the statement
628
629 res = yield from something()
630
631 *except* that if the coroutine containing it is cancelled, the
632 task running in something() is not cancelled. From the POV of
633 something(), the cancellation did not happen. But its caller is
634 still cancelled, so the yield-from expression still raises
635 CancelledError. Note: If something() is cancelled by other means
636 this will still cancel shield().
637
638 If you want to completely ignore cancellation (not recommended)
639 you can combine shield() with a try/except clause, as follows:
640
641 try:
642 res = yield from shield(something())
643 except CancelledError:
644 res = None
645 """
646 inner = async(arg, loop=loop)
647 if inner.done():
648 # Shortcut.
649 return inner
650 loop = inner._loop
651 outer = futures.Future(loop=loop)
652
653 def _done_callback(inner):
654 if outer.cancelled():
655 # Mark inner's result as retrieved.
656 inner.cancelled() or inner.exception()
657 return
658 if inner.cancelled():
659 outer.cancel()
660 else:
661 exc = inner.exception()
662 if exc is not None:
663 outer.set_exception(exc)
664 else:
665 outer.set_result(inner.result())
666
667 inner.add_done_callback(_done_callback)
668 return outer