blob: 5ad06520e9a4e5cd92ebed42d2ce102597244844 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
3__all__ = ['coroutine', 'Task',
Guido van Rossum68816ef2013-12-28 08:06:40 -10004 'iscoroutinefunction', 'iscoroutine',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
6 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
Guido van Rossumde3a1362013-11-29 09:29:00 -08007 'gather', 'shield',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008 ]
9
10import collections
11import concurrent.futures
12import functools
13import inspect
14import linecache
15import traceback
16import weakref
17
18from . import events
19from . import futures
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070020from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021
22# If you set _DEBUG to true, @coroutine will wrap the resulting
23# generator objects in a CoroWrapper instance (defined below). That
24# instance will log a message when the generator is never iterated
25# over, which may happen when you forget to use "yield from" with a
26# coroutine call. Note that the value of the _DEBUG flag is taken
27# when the decorator is used, so to be of any use it must be set
28# before you define your coroutines. A downside of using this feature
29# is that tracebacks show entries for the CoroWrapper.__next__ method
30# when _DEBUG is true.
31_DEBUG = False
32
33
34class CoroWrapper:
Guido van Rossume1f55442014-01-16 11:05:23 -080035 # Wrapper for coroutine in _DEBUG mode.
36
Victor Stinnerbac77932014-01-16 01:55:29 +010037 __slots__ = ['gen', 'func', '__name__', '__doc__']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038
39 def __init__(self, gen, func):
40 assert inspect.isgenerator(gen), gen
41 self.gen = gen
42 self.func = func
43
44 def __iter__(self):
45 return self
46
47 def __next__(self):
48 return next(self.gen)
49
50 def send(self, value):
51 return self.gen.send(value)
52
53 def throw(self, exc):
54 return self.gen.throw(exc)
55
56 def close(self):
57 return self.gen.close()
58
59 def __del__(self):
60 frame = self.gen.gi_frame
61 if frame is not None and frame.f_lasti == -1:
62 func = self.func
63 code = func.__code__
64 filename = code.co_filename
65 lineno = code.co_firstlineno
Guido van Rossum2b430b82013-11-01 14:13:30 -070066 logger.error(
67 'Coroutine %r defined at %s:%s was never yielded from',
68 func.__name__, filename, lineno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070069
70
71def coroutine(func):
72 """Decorator to mark coroutines.
73
74 If the coroutine is not yielded from before it is destroyed,
75 an error message is logged.
76 """
77 if inspect.isgeneratorfunction(func):
78 coro = func
79 else:
80 @functools.wraps(func)
81 def coro(*args, **kw):
82 res = func(*args, **kw)
83 if isinstance(res, futures.Future) or inspect.isgenerator(res):
84 res = yield from res
85 return res
86
87 if not _DEBUG:
88 wrapper = coro
89 else:
90 @functools.wraps(func)
91 def wrapper(*args, **kwds):
92 w = CoroWrapper(coro(*args, **kwds), func)
93 w.__name__ = coro.__name__
94 w.__doc__ = coro.__doc__
95 return w
96
97 wrapper._is_coroutine = True # For iscoroutinefunction().
98 return wrapper
99
100
101def iscoroutinefunction(func):
102 """Return True if func is a decorated coroutine function."""
103 return getattr(func, '_is_coroutine', False)
104
105
106def iscoroutine(obj):
107 """Return True if obj is a coroutine object."""
108 return isinstance(obj, CoroWrapper) or inspect.isgenerator(obj)
109
110
111class Task(futures.Future):
112 """A coroutine wrapped in a Future."""
113
114 # An important invariant maintained while a Task not done:
115 #
116 # - Either _fut_waiter is None, and _step() is scheduled;
117 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
118 #
119 # The only transition from the latter to the former is through
120 # _wakeup(). When _fut_waiter is not None, one of its callbacks
121 # must be _wakeup().
122
123 # Weak set containing all tasks alive.
124 _all_tasks = weakref.WeakSet()
125
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800126 # Dictionary containing tasks that are currently active in
127 # all running event loops. {EventLoop: Task}
128 _current_tasks = {}
129
130 @classmethod
131 def current_task(cls, loop=None):
132 """Return the currently running task in an event loop or None.
133
134 By default the current task for the current event loop is returned.
135
136 None is returned when called not in the context of a Task.
137 """
138 if loop is None:
139 loop = events.get_event_loop()
140 return cls._current_tasks.get(loop)
141
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700142 @classmethod
143 def all_tasks(cls, loop=None):
144 """Return a set of all tasks for an event loop.
145
146 By default all tasks for the current event loop are returned.
147 """
148 if loop is None:
149 loop = events.get_event_loop()
150 return {t for t in cls._all_tasks if t._loop is loop}
151
152 def __init__(self, coro, *, loop=None):
153 assert iscoroutine(coro), repr(coro) # Not a coroutine function!
154 super().__init__(loop=loop)
155 self._coro = iter(coro) # Use the iterator just in case.
156 self._fut_waiter = None
157 self._must_cancel = False
158 self._loop.call_soon(self._step)
159 self.__class__._all_tasks.add(self)
160
161 def __repr__(self):
162 res = super().__repr__()
163 if (self._must_cancel and
164 self._state == futures._PENDING and
165 '<PENDING' in res):
166 res = res.replace('<PENDING', '<CANCELLING', 1)
167 i = res.find('<')
168 if i < 0:
169 i = len(res)
170 res = res[:i] + '(<{}>)'.format(self._coro.__name__) + res[i:]
171 return res
172
173 def get_stack(self, *, limit=None):
174 """Return the list of stack frames for this task's coroutine.
175
176 If the coroutine is active, this returns the stack where it is
177 suspended. If the coroutine has completed successfully or was
178 cancelled, this returns an empty list. If the coroutine was
179 terminated by an exception, this returns the list of traceback
180 frames.
181
182 The frames are always ordered from oldest to newest.
183
184 The optional limit gives the maximum nummber of frames to
185 return; by default all available frames are returned. Its
186 meaning differs depending on whether a stack or a traceback is
187 returned: the newest frames of a stack are returned, but the
188 oldest frames of a traceback are returned. (This matches the
189 behavior of the traceback module.)
190
191 For reasons beyond our control, only one stack frame is
192 returned for a suspended coroutine.
193 """
194 frames = []
195 f = self._coro.gi_frame
196 if f is not None:
197 while f is not None:
198 if limit is not None:
199 if limit <= 0:
200 break
201 limit -= 1
202 frames.append(f)
203 f = f.f_back
204 frames.reverse()
205 elif self._exception is not None:
206 tb = self._exception.__traceback__
207 while tb is not None:
208 if limit is not None:
209 if limit <= 0:
210 break
211 limit -= 1
212 frames.append(tb.tb_frame)
213 tb = tb.tb_next
214 return frames
215
216 def print_stack(self, *, limit=None, file=None):
217 """Print the stack or traceback for this task's coroutine.
218
219 This produces output similar to that of the traceback module,
220 for the frames retrieved by get_stack(). The limit argument
221 is passed to get_stack(). The file argument is an I/O stream
222 to which the output goes; by default it goes to sys.stderr.
223 """
224 extracted_list = []
225 checked = set()
226 for f in self.get_stack(limit=limit):
227 lineno = f.f_lineno
228 co = f.f_code
229 filename = co.co_filename
230 name = co.co_name
231 if filename not in checked:
232 checked.add(filename)
233 linecache.checkcache(filename)
234 line = linecache.getline(filename, lineno, f.f_globals)
235 extracted_list.append((filename, lineno, name, line))
236 exc = self._exception
237 if not extracted_list:
238 print('No stack for %r' % self, file=file)
239 elif exc is not None:
240 print('Traceback for %r (most recent call last):' % self,
241 file=file)
242 else:
243 print('Stack for %r (most recent call last):' % self,
244 file=file)
245 traceback.print_list(extracted_list, file=file)
246 if exc is not None:
247 for line in traceback.format_exception_only(exc.__class__, exc):
248 print(line, file=file, end='')
249
250 def cancel(self):
251 if self.done():
252 return False
253 if self._fut_waiter is not None:
254 if self._fut_waiter.cancel():
255 # Leave self._fut_waiter; it may be a Task that
256 # catches and ignores the cancellation so we may have
257 # to cancel it again later.
258 return True
259 # It must be the case that self._step is already scheduled.
260 self._must_cancel = True
261 return True
262
263 def _step(self, value=None, exc=None):
264 assert not self.done(), \
265 '_step(): already done: {!r}, {!r}, {!r}'.format(self, value, exc)
266 if self._must_cancel:
267 if not isinstance(exc, futures.CancelledError):
268 exc = futures.CancelledError()
269 self._must_cancel = False
270 coro = self._coro
271 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800272
273 self.__class__._current_tasks[self._loop] = self
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700274 # Call either coro.throw(exc) or coro.send(value).
275 try:
276 if exc is not None:
277 result = coro.throw(exc)
278 elif value is not None:
279 result = coro.send(value)
280 else:
281 result = next(coro)
282 except StopIteration as exc:
283 self.set_result(exc.value)
284 except futures.CancelledError as exc:
285 super().cancel() # I.e., Future.cancel(self).
286 except Exception as exc:
287 self.set_exception(exc)
288 except BaseException as exc:
289 self.set_exception(exc)
290 raise
291 else:
292 if isinstance(result, futures.Future):
293 # Yielded Future must come from Future.__iter__().
294 if result._blocking:
295 result._blocking = False
296 result.add_done_callback(self._wakeup)
297 self._fut_waiter = result
298 if self._must_cancel:
299 if self._fut_waiter.cancel():
300 self._must_cancel = False
301 else:
302 self._loop.call_soon(
303 self._step, None,
304 RuntimeError(
305 'yield was used instead of yield from '
306 'in task {!r} with {!r}'.format(self, result)))
307 elif result is None:
308 # Bare yield relinquishes control for one event loop iteration.
309 self._loop.call_soon(self._step)
310 elif inspect.isgenerator(result):
311 # Yielding a generator is just wrong.
312 self._loop.call_soon(
313 self._step, None,
314 RuntimeError(
315 'yield was used instead of yield from for '
316 'generator in task {!r} with {}'.format(
317 self, result)))
318 else:
319 # Yielding something else is an error.
320 self._loop.call_soon(
321 self._step, None,
322 RuntimeError(
323 'Task got bad yield: {!r}'.format(result)))
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800324 finally:
325 self.__class__._current_tasks.pop(self._loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700326 self = None
327
328 def _wakeup(self, future):
329 try:
330 value = future.result()
331 except Exception as exc:
332 # This may also be a cancellation.
333 self._step(None, exc)
334 else:
335 self._step(value, None)
336 self = None # Needed to break cycles when an exception occurs.
337
338
339# wait() and as_completed() similar to those in PEP 3148.
340
341FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
342FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
343ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
344
345
346@coroutine
347def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
348 """Wait for the Futures and coroutines given by fs to complete.
349
350 Coroutines will be wrapped in Tasks.
351
352 Returns two sets of Future: (done, pending).
353
354 Usage:
355
356 done, pending = yield from asyncio.wait(fs)
357
358 Note: This does not raise TimeoutError! Futures that aren't done
359 when the timeout occurs are returned in the second set.
360 """
361 if not fs:
362 raise ValueError('Set of coroutines/Futures is empty.')
363
364 if loop is None:
365 loop = events.get_event_loop()
366
Yury Selivanov622be342014-02-06 22:06:16 -0500367 fs = {async(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700368
369 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
370 raise ValueError('Invalid return_when value: {}'.format(return_when))
371 return (yield from _wait(fs, timeout, return_when, loop))
372
373
374def _release_waiter(waiter, value=True, *args):
375 if not waiter.done():
376 waiter.set_result(value)
377
378
379@coroutine
380def wait_for(fut, timeout, *, loop=None):
381 """Wait for the single Future or coroutine to complete, with timeout.
382
383 Coroutine will be wrapped in Task.
384
Victor Stinner421e49b2014-01-23 17:40:59 +0100385 Returns result of the Future or coroutine. When a timeout occurs,
386 it cancels the task and raises TimeoutError. To avoid the task
387 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700388
389 Usage:
390
391 result = yield from asyncio.wait_for(fut, 10.0)
392
393 """
394 if loop is None:
395 loop = events.get_event_loop()
396
Guido van Rossum48c66c32014-01-29 14:30:38 -0800397 if timeout is None:
398 return (yield from fut)
399
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700400 waiter = futures.Future(loop=loop)
401 timeout_handle = loop.call_later(timeout, _release_waiter, waiter, False)
402 cb = functools.partial(_release_waiter, waiter, True)
403
404 fut = async(fut, loop=loop)
405 fut.add_done_callback(cb)
406
407 try:
408 if (yield from waiter):
409 return fut.result()
410 else:
411 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100412 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700413 raise futures.TimeoutError()
414 finally:
415 timeout_handle.cancel()
416
417
418@coroutine
419def _wait(fs, timeout, return_when, loop):
420 """Internal helper for wait() and _wait_for().
421
422 The fs argument must be a collection of Futures.
423 """
424 assert fs, 'Set of Futures is empty.'
425 waiter = futures.Future(loop=loop)
426 timeout_handle = None
427 if timeout is not None:
428 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
429 counter = len(fs)
430
431 def _on_completion(f):
432 nonlocal counter
433 counter -= 1
434 if (counter <= 0 or
435 return_when == FIRST_COMPLETED or
436 return_when == FIRST_EXCEPTION and (not f.cancelled() and
437 f.exception() is not None)):
438 if timeout_handle is not None:
439 timeout_handle.cancel()
440 if not waiter.done():
441 waiter.set_result(False)
442
443 for f in fs:
444 f.add_done_callback(_on_completion)
445
446 try:
447 yield from waiter
448 finally:
449 if timeout_handle is not None:
450 timeout_handle.cancel()
451
452 done, pending = set(), set()
453 for f in fs:
454 f.remove_done_callback(_on_completion)
455 if f.done():
456 done.add(f)
457 else:
458 pending.add(f)
459 return done, pending
460
461
462# This is *not* a @coroutine! It is just an iterator (yielding Futures).
463def as_completed(fs, *, loop=None, timeout=None):
464 """Return an iterator whose values, when waited for, are Futures.
465
466 This differs from PEP 3148; the proper way to use this is:
467
468 for f in as_completed(fs):
469 result = yield from f # The 'yield from' may raise.
470 # Use result.
471
472 Raises TimeoutError if the timeout occurs before all Futures are
473 done.
474
475 Note: The futures 'f' are not necessarily members of fs.
476 """
477 loop = loop if loop is not None else events.get_event_loop()
478 deadline = None if timeout is None else loop.time() + timeout
Yury Selivanov622be342014-02-06 22:06:16 -0500479 todo = {async(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700480 completed = collections.deque()
481
482 @coroutine
483 def _wait_for_one():
484 while not completed:
485 timeout = None
486 if deadline is not None:
487 timeout = deadline - loop.time()
488 if timeout < 0:
489 raise futures.TimeoutError()
490 done, pending = yield from _wait(
491 todo, timeout, FIRST_COMPLETED, loop)
492 # Multiple callers might be waiting for the same events
493 # and getting the same outcome. Dedupe by updating todo.
494 for f in done:
495 if f in todo:
496 todo.remove(f)
497 completed.append(f)
498 f = completed.popleft()
499 return f.result() # May raise.
500
501 for _ in range(len(todo)):
502 yield _wait_for_one()
503
504
505@coroutine
506def sleep(delay, result=None, *, loop=None):
507 """Coroutine that completes after a given time (in seconds)."""
508 future = futures.Future(loop=loop)
509 h = future._loop.call_later(delay, future.set_result, result)
510 try:
511 return (yield from future)
512 finally:
513 h.cancel()
514
515
516def async(coro_or_future, *, loop=None):
517 """Wrap a coroutine in a future.
518
519 If the argument is a Future, it is returned directly.
520 """
521 if isinstance(coro_or_future, futures.Future):
522 if loop is not None and loop is not coro_or_future._loop:
523 raise ValueError('loop argument must agree with Future')
524 return coro_or_future
525 elif iscoroutine(coro_or_future):
526 return Task(coro_or_future, loop=loop)
527 else:
528 raise TypeError('A Future or coroutine is required')
529
530
531class _GatheringFuture(futures.Future):
532 """Helper for gather().
533
534 This overrides cancel() to cancel all the children and act more
535 like Task.cancel(), which doesn't immediately mark itself as
536 cancelled.
537 """
538
539 def __init__(self, children, *, loop=None):
540 super().__init__(loop=loop)
541 self._children = children
542
543 def cancel(self):
544 if self.done():
545 return False
546 for child in self._children:
547 child.cancel()
548 return True
549
550
551def gather(*coros_or_futures, loop=None, return_exceptions=False):
552 """Return a future aggregating results from the given coroutines
553 or futures.
554
555 All futures must share the same event loop. If all the tasks are
556 done successfully, the returned future's result is the list of
557 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500558 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700559 exceptions in the tasks are treated the same as successful
560 results, and gathered in the result list; otherwise, the first
561 raised exception will be immediately propagated to the returned
562 future.
563
564 Cancellation: if the outer Future is cancelled, all children (that
565 have not completed yet) are also cancelled. If any child is
566 cancelled, this is treated as if it raised CancelledError --
567 the outer Future is *not* cancelled in this case. (This is to
568 prevent the cancellation of one child to cause other children to
569 be cancelled.)
570 """
Yury Selivanov622be342014-02-06 22:06:16 -0500571 arg_to_fut = {arg: async(arg, loop=loop) for arg in set(coros_or_futures)}
572 children = [arg_to_fut[arg] for arg in coros_or_futures]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700573 n = len(children)
574 if n == 0:
575 outer = futures.Future(loop=loop)
576 outer.set_result([])
577 return outer
578 if loop is None:
579 loop = children[0]._loop
580 for fut in children:
581 if fut._loop is not loop:
582 raise ValueError("futures are tied to different event loops")
583 outer = _GatheringFuture(children, loop=loop)
584 nfinished = 0
585 results = [None] * n
586
587 def _done_callback(i, fut):
588 nonlocal nfinished
589 if outer._state != futures._PENDING:
590 if fut._exception is not None:
591 # Mark exception retrieved.
592 fut.exception()
593 return
594 if fut._state == futures._CANCELLED:
595 res = futures.CancelledError()
596 if not return_exceptions:
597 outer.set_exception(res)
598 return
599 elif fut._exception is not None:
600 res = fut.exception() # Mark exception retrieved.
601 if not return_exceptions:
602 outer.set_exception(res)
603 return
604 else:
605 res = fut._result
606 results[i] = res
607 nfinished += 1
608 if nfinished == n:
609 outer.set_result(results)
610
611 for i, fut in enumerate(children):
612 fut.add_done_callback(functools.partial(_done_callback, i))
613 return outer
614
615
616def shield(arg, *, loop=None):
617 """Wait for a future, shielding it from cancellation.
618
619 The statement
620
621 res = yield from shield(something())
622
623 is exactly equivalent to the statement
624
625 res = yield from something()
626
627 *except* that if the coroutine containing it is cancelled, the
628 task running in something() is not cancelled. From the POV of
629 something(), the cancellation did not happen. But its caller is
630 still cancelled, so the yield-from expression still raises
631 CancelledError. Note: If something() is cancelled by other means
632 this will still cancel shield().
633
634 If you want to completely ignore cancellation (not recommended)
635 you can combine shield() with a try/except clause, as follows:
636
637 try:
638 res = yield from shield(something())
639 except CancelledError:
640 res = None
641 """
642 inner = async(arg, loop=loop)
643 if inner.done():
644 # Shortcut.
645 return inner
646 loop = inner._loop
647 outer = futures.Future(loop=loop)
648
649 def _done_callback(inner):
650 if outer.cancelled():
651 # Mark inner's result as retrieved.
652 inner.cancelled() or inner.exception()
653 return
654 if inner.cancelled():
655 outer.cancel()
656 else:
657 exc = inner.exception()
658 if exc is not None:
659 outer.set_exception(exc)
660 else:
661 outer.set_result(inner.result())
662
663 inner.add_done_callback(_done_callback)
664 return outer