blob: cd9718f534a71210139da9c576a3ea448794fdba [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
3__all__ = ['coroutine', 'Task',
4 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
5 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
Guido van Rossumde3a1362013-11-29 09:29:00 -08006 'gather', 'shield',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007 ]
8
9import collections
10import concurrent.futures
11import functools
12import inspect
13import linecache
14import traceback
15import weakref
16
17from . import events
18from . import futures
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070019from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020
21# If you set _DEBUG to true, @coroutine will wrap the resulting
22# generator objects in a CoroWrapper instance (defined below). That
23# instance will log a message when the generator is never iterated
24# over, which may happen when you forget to use "yield from" with a
25# coroutine call. Note that the value of the _DEBUG flag is taken
26# when the decorator is used, so to be of any use it must be set
27# before you define your coroutines. A downside of using this feature
28# is that tracebacks show entries for the CoroWrapper.__next__ method
29# when _DEBUG is true.
30_DEBUG = False
31
32
33class CoroWrapper:
34 """Wrapper for coroutine in _DEBUG mode."""
35
36 __slot__ = ['gen', 'func']
37
38 def __init__(self, gen, func):
39 assert inspect.isgenerator(gen), gen
40 self.gen = gen
41 self.func = func
42
43 def __iter__(self):
44 return self
45
46 def __next__(self):
47 return next(self.gen)
48
49 def send(self, value):
50 return self.gen.send(value)
51
52 def throw(self, exc):
53 return self.gen.throw(exc)
54
55 def close(self):
56 return self.gen.close()
57
58 def __del__(self):
59 frame = self.gen.gi_frame
60 if frame is not None and frame.f_lasti == -1:
61 func = self.func
62 code = func.__code__
63 filename = code.co_filename
64 lineno = code.co_firstlineno
Guido van Rossum2b430b82013-11-01 14:13:30 -070065 logger.error(
66 'Coroutine %r defined at %s:%s was never yielded from',
67 func.__name__, filename, lineno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070068
69
70def coroutine(func):
71 """Decorator to mark coroutines.
72
73 If the coroutine is not yielded from before it is destroyed,
74 an error message is logged.
75 """
76 if inspect.isgeneratorfunction(func):
77 coro = func
78 else:
79 @functools.wraps(func)
80 def coro(*args, **kw):
81 res = func(*args, **kw)
82 if isinstance(res, futures.Future) or inspect.isgenerator(res):
83 res = yield from res
84 return res
85
86 if not _DEBUG:
87 wrapper = coro
88 else:
89 @functools.wraps(func)
90 def wrapper(*args, **kwds):
91 w = CoroWrapper(coro(*args, **kwds), func)
92 w.__name__ = coro.__name__
93 w.__doc__ = coro.__doc__
94 return w
95
96 wrapper._is_coroutine = True # For iscoroutinefunction().
97 return wrapper
98
99
100def iscoroutinefunction(func):
101 """Return True if func is a decorated coroutine function."""
102 return getattr(func, '_is_coroutine', False)
103
104
105def iscoroutine(obj):
106 """Return True if obj is a coroutine object."""
107 return isinstance(obj, CoroWrapper) or inspect.isgenerator(obj)
108
109
110class Task(futures.Future):
111 """A coroutine wrapped in a Future."""
112
113 # An important invariant maintained while a Task not done:
114 #
115 # - Either _fut_waiter is None, and _step() is scheduled;
116 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
117 #
118 # The only transition from the latter to the former is through
119 # _wakeup(). When _fut_waiter is not None, one of its callbacks
120 # must be _wakeup().
121
122 # Weak set containing all tasks alive.
123 _all_tasks = weakref.WeakSet()
124
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800125 # Dictionary containing tasks that are currently active in
126 # all running event loops. {EventLoop: Task}
127 _current_tasks = {}
128
129 @classmethod
130 def current_task(cls, loop=None):
131 """Return the currently running task in an event loop or None.
132
133 By default the current task for the current event loop is returned.
134
135 None is returned when called not in the context of a Task.
136 """
137 if loop is None:
138 loop = events.get_event_loop()
139 return cls._current_tasks.get(loop)
140
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700141 @classmethod
142 def all_tasks(cls, loop=None):
143 """Return a set of all tasks for an event loop.
144
145 By default all tasks for the current event loop are returned.
146 """
147 if loop is None:
148 loop = events.get_event_loop()
149 return {t for t in cls._all_tasks if t._loop is loop}
150
151 def __init__(self, coro, *, loop=None):
152 assert iscoroutine(coro), repr(coro) # Not a coroutine function!
153 super().__init__(loop=loop)
154 self._coro = iter(coro) # Use the iterator just in case.
155 self._fut_waiter = None
156 self._must_cancel = False
157 self._loop.call_soon(self._step)
158 self.__class__._all_tasks.add(self)
159
160 def __repr__(self):
161 res = super().__repr__()
162 if (self._must_cancel and
163 self._state == futures._PENDING and
164 '<PENDING' in res):
165 res = res.replace('<PENDING', '<CANCELLING', 1)
166 i = res.find('<')
167 if i < 0:
168 i = len(res)
169 res = res[:i] + '(<{}>)'.format(self._coro.__name__) + res[i:]
170 return res
171
172 def get_stack(self, *, limit=None):
173 """Return the list of stack frames for this task's coroutine.
174
175 If the coroutine is active, this returns the stack where it is
176 suspended. If the coroutine has completed successfully or was
177 cancelled, this returns an empty list. If the coroutine was
178 terminated by an exception, this returns the list of traceback
179 frames.
180
181 The frames are always ordered from oldest to newest.
182
183 The optional limit gives the maximum nummber of frames to
184 return; by default all available frames are returned. Its
185 meaning differs depending on whether a stack or a traceback is
186 returned: the newest frames of a stack are returned, but the
187 oldest frames of a traceback are returned. (This matches the
188 behavior of the traceback module.)
189
190 For reasons beyond our control, only one stack frame is
191 returned for a suspended coroutine.
192 """
193 frames = []
194 f = self._coro.gi_frame
195 if f is not None:
196 while f is not None:
197 if limit is not None:
198 if limit <= 0:
199 break
200 limit -= 1
201 frames.append(f)
202 f = f.f_back
203 frames.reverse()
204 elif self._exception is not None:
205 tb = self._exception.__traceback__
206 while tb is not None:
207 if limit is not None:
208 if limit <= 0:
209 break
210 limit -= 1
211 frames.append(tb.tb_frame)
212 tb = tb.tb_next
213 return frames
214
215 def print_stack(self, *, limit=None, file=None):
216 """Print the stack or traceback for this task's coroutine.
217
218 This produces output similar to that of the traceback module,
219 for the frames retrieved by get_stack(). The limit argument
220 is passed to get_stack(). The file argument is an I/O stream
221 to which the output goes; by default it goes to sys.stderr.
222 """
223 extracted_list = []
224 checked = set()
225 for f in self.get_stack(limit=limit):
226 lineno = f.f_lineno
227 co = f.f_code
228 filename = co.co_filename
229 name = co.co_name
230 if filename not in checked:
231 checked.add(filename)
232 linecache.checkcache(filename)
233 line = linecache.getline(filename, lineno, f.f_globals)
234 extracted_list.append((filename, lineno, name, line))
235 exc = self._exception
236 if not extracted_list:
237 print('No stack for %r' % self, file=file)
238 elif exc is not None:
239 print('Traceback for %r (most recent call last):' % self,
240 file=file)
241 else:
242 print('Stack for %r (most recent call last):' % self,
243 file=file)
244 traceback.print_list(extracted_list, file=file)
245 if exc is not None:
246 for line in traceback.format_exception_only(exc.__class__, exc):
247 print(line, file=file, end='')
248
249 def cancel(self):
250 if self.done():
251 return False
252 if self._fut_waiter is not None:
253 if self._fut_waiter.cancel():
254 # Leave self._fut_waiter; it may be a Task that
255 # catches and ignores the cancellation so we may have
256 # to cancel it again later.
257 return True
258 # It must be the case that self._step is already scheduled.
259 self._must_cancel = True
260 return True
261
262 def _step(self, value=None, exc=None):
263 assert not self.done(), \
264 '_step(): already done: {!r}, {!r}, {!r}'.format(self, value, exc)
265 if self._must_cancel:
266 if not isinstance(exc, futures.CancelledError):
267 exc = futures.CancelledError()
268 self._must_cancel = False
269 coro = self._coro
270 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800271
272 self.__class__._current_tasks[self._loop] = self
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700273 # Call either coro.throw(exc) or coro.send(value).
274 try:
275 if exc is not None:
276 result = coro.throw(exc)
277 elif value is not None:
278 result = coro.send(value)
279 else:
280 result = next(coro)
281 except StopIteration as exc:
282 self.set_result(exc.value)
283 except futures.CancelledError as exc:
284 super().cancel() # I.e., Future.cancel(self).
285 except Exception as exc:
286 self.set_exception(exc)
287 except BaseException as exc:
288 self.set_exception(exc)
289 raise
290 else:
291 if isinstance(result, futures.Future):
292 # Yielded Future must come from Future.__iter__().
293 if result._blocking:
294 result._blocking = False
295 result.add_done_callback(self._wakeup)
296 self._fut_waiter = result
297 if self._must_cancel:
298 if self._fut_waiter.cancel():
299 self._must_cancel = False
300 else:
301 self._loop.call_soon(
302 self._step, None,
303 RuntimeError(
304 'yield was used instead of yield from '
305 'in task {!r} with {!r}'.format(self, result)))
306 elif result is None:
307 # Bare yield relinquishes control for one event loop iteration.
308 self._loop.call_soon(self._step)
309 elif inspect.isgenerator(result):
310 # Yielding a generator is just wrong.
311 self._loop.call_soon(
312 self._step, None,
313 RuntimeError(
314 'yield was used instead of yield from for '
315 'generator in task {!r} with {}'.format(
316 self, result)))
317 else:
318 # Yielding something else is an error.
319 self._loop.call_soon(
320 self._step, None,
321 RuntimeError(
322 'Task got bad yield: {!r}'.format(result)))
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800323 finally:
324 self.__class__._current_tasks.pop(self._loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700325 self = None
326
327 def _wakeup(self, future):
328 try:
329 value = future.result()
330 except Exception as exc:
331 # This may also be a cancellation.
332 self._step(None, exc)
333 else:
334 self._step(value, None)
335 self = None # Needed to break cycles when an exception occurs.
336
337
338# wait() and as_completed() similar to those in PEP 3148.
339
340FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
341FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
342ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
343
344
345@coroutine
346def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
347 """Wait for the Futures and coroutines given by fs to complete.
348
349 Coroutines will be wrapped in Tasks.
350
351 Returns two sets of Future: (done, pending).
352
353 Usage:
354
355 done, pending = yield from asyncio.wait(fs)
356
357 Note: This does not raise TimeoutError! Futures that aren't done
358 when the timeout occurs are returned in the second set.
359 """
360 if not fs:
361 raise ValueError('Set of coroutines/Futures is empty.')
362
363 if loop is None:
364 loop = events.get_event_loop()
365
366 fs = set(async(f, loop=loop) for f in fs)
367
368 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
369 raise ValueError('Invalid return_when value: {}'.format(return_when))
370 return (yield from _wait(fs, timeout, return_when, loop))
371
372
373def _release_waiter(waiter, value=True, *args):
374 if not waiter.done():
375 waiter.set_result(value)
376
377
378@coroutine
379def wait_for(fut, timeout, *, loop=None):
380 """Wait for the single Future or coroutine to complete, with timeout.
381
382 Coroutine will be wrapped in Task.
383
384 Returns result of the Future or coroutine. Raises TimeoutError when
385 timeout occurs.
386
387 Usage:
388
389 result = yield from asyncio.wait_for(fut, 10.0)
390
391 """
392 if loop is None:
393 loop = events.get_event_loop()
394
395 waiter = futures.Future(loop=loop)
396 timeout_handle = loop.call_later(timeout, _release_waiter, waiter, False)
397 cb = functools.partial(_release_waiter, waiter, True)
398
399 fut = async(fut, loop=loop)
400 fut.add_done_callback(cb)
401
402 try:
403 if (yield from waiter):
404 return fut.result()
405 else:
406 fut.remove_done_callback(cb)
407 raise futures.TimeoutError()
408 finally:
409 timeout_handle.cancel()
410
411
412@coroutine
413def _wait(fs, timeout, return_when, loop):
414 """Internal helper for wait() and _wait_for().
415
416 The fs argument must be a collection of Futures.
417 """
418 assert fs, 'Set of Futures is empty.'
419 waiter = futures.Future(loop=loop)
420 timeout_handle = None
421 if timeout is not None:
422 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
423 counter = len(fs)
424
425 def _on_completion(f):
426 nonlocal counter
427 counter -= 1
428 if (counter <= 0 or
429 return_when == FIRST_COMPLETED or
430 return_when == FIRST_EXCEPTION and (not f.cancelled() and
431 f.exception() is not None)):
432 if timeout_handle is not None:
433 timeout_handle.cancel()
434 if not waiter.done():
435 waiter.set_result(False)
436
437 for f in fs:
438 f.add_done_callback(_on_completion)
439
440 try:
441 yield from waiter
442 finally:
443 if timeout_handle is not None:
444 timeout_handle.cancel()
445
446 done, pending = set(), set()
447 for f in fs:
448 f.remove_done_callback(_on_completion)
449 if f.done():
450 done.add(f)
451 else:
452 pending.add(f)
453 return done, pending
454
455
456# This is *not* a @coroutine! It is just an iterator (yielding Futures).
457def as_completed(fs, *, loop=None, timeout=None):
458 """Return an iterator whose values, when waited for, are Futures.
459
460 This differs from PEP 3148; the proper way to use this is:
461
462 for f in as_completed(fs):
463 result = yield from f # The 'yield from' may raise.
464 # Use result.
465
466 Raises TimeoutError if the timeout occurs before all Futures are
467 done.
468
469 Note: The futures 'f' are not necessarily members of fs.
470 """
471 loop = loop if loop is not None else events.get_event_loop()
472 deadline = None if timeout is None else loop.time() + timeout
473 todo = set(async(f, loop=loop) for f in fs)
474 completed = collections.deque()
475
476 @coroutine
477 def _wait_for_one():
478 while not completed:
479 timeout = None
480 if deadline is not None:
481 timeout = deadline - loop.time()
482 if timeout < 0:
483 raise futures.TimeoutError()
484 done, pending = yield from _wait(
485 todo, timeout, FIRST_COMPLETED, loop)
486 # Multiple callers might be waiting for the same events
487 # and getting the same outcome. Dedupe by updating todo.
488 for f in done:
489 if f in todo:
490 todo.remove(f)
491 completed.append(f)
492 f = completed.popleft()
493 return f.result() # May raise.
494
495 for _ in range(len(todo)):
496 yield _wait_for_one()
497
498
499@coroutine
500def sleep(delay, result=None, *, loop=None):
501 """Coroutine that completes after a given time (in seconds)."""
502 future = futures.Future(loop=loop)
503 h = future._loop.call_later(delay, future.set_result, result)
504 try:
505 return (yield from future)
506 finally:
507 h.cancel()
508
509
510def async(coro_or_future, *, loop=None):
511 """Wrap a coroutine in a future.
512
513 If the argument is a Future, it is returned directly.
514 """
515 if isinstance(coro_or_future, futures.Future):
516 if loop is not None and loop is not coro_or_future._loop:
517 raise ValueError('loop argument must agree with Future')
518 return coro_or_future
519 elif iscoroutine(coro_or_future):
520 return Task(coro_or_future, loop=loop)
521 else:
522 raise TypeError('A Future or coroutine is required')
523
524
525class _GatheringFuture(futures.Future):
526 """Helper for gather().
527
528 This overrides cancel() to cancel all the children and act more
529 like Task.cancel(), which doesn't immediately mark itself as
530 cancelled.
531 """
532
533 def __init__(self, children, *, loop=None):
534 super().__init__(loop=loop)
535 self._children = children
536
537 def cancel(self):
538 if self.done():
539 return False
540 for child in self._children:
541 child.cancel()
542 return True
543
544
545def gather(*coros_or_futures, loop=None, return_exceptions=False):
546 """Return a future aggregating results from the given coroutines
547 or futures.
548
549 All futures must share the same event loop. If all the tasks are
550 done successfully, the returned future's result is the list of
551 results (in the order of the original sequence, not necessarily
552 the order of results arrival). If *result_exception* is True,
553 exceptions in the tasks are treated the same as successful
554 results, and gathered in the result list; otherwise, the first
555 raised exception will be immediately propagated to the returned
556 future.
557
558 Cancellation: if the outer Future is cancelled, all children (that
559 have not completed yet) are also cancelled. If any child is
560 cancelled, this is treated as if it raised CancelledError --
561 the outer Future is *not* cancelled in this case. (This is to
562 prevent the cancellation of one child to cause other children to
563 be cancelled.)
564 """
565 children = [async(fut, loop=loop) for fut in coros_or_futures]
566 n = len(children)
567 if n == 0:
568 outer = futures.Future(loop=loop)
569 outer.set_result([])
570 return outer
571 if loop is None:
572 loop = children[0]._loop
573 for fut in children:
574 if fut._loop is not loop:
575 raise ValueError("futures are tied to different event loops")
576 outer = _GatheringFuture(children, loop=loop)
577 nfinished = 0
578 results = [None] * n
579
580 def _done_callback(i, fut):
581 nonlocal nfinished
582 if outer._state != futures._PENDING:
583 if fut._exception is not None:
584 # Mark exception retrieved.
585 fut.exception()
586 return
587 if fut._state == futures._CANCELLED:
588 res = futures.CancelledError()
589 if not return_exceptions:
590 outer.set_exception(res)
591 return
592 elif fut._exception is not None:
593 res = fut.exception() # Mark exception retrieved.
594 if not return_exceptions:
595 outer.set_exception(res)
596 return
597 else:
598 res = fut._result
599 results[i] = res
600 nfinished += 1
601 if nfinished == n:
602 outer.set_result(results)
603
604 for i, fut in enumerate(children):
605 fut.add_done_callback(functools.partial(_done_callback, i))
606 return outer
607
608
609def shield(arg, *, loop=None):
610 """Wait for a future, shielding it from cancellation.
611
612 The statement
613
614 res = yield from shield(something())
615
616 is exactly equivalent to the statement
617
618 res = yield from something()
619
620 *except* that if the coroutine containing it is cancelled, the
621 task running in something() is not cancelled. From the POV of
622 something(), the cancellation did not happen. But its caller is
623 still cancelled, so the yield-from expression still raises
624 CancelledError. Note: If something() is cancelled by other means
625 this will still cancel shield().
626
627 If you want to completely ignore cancellation (not recommended)
628 you can combine shield() with a try/except clause, as follows:
629
630 try:
631 res = yield from shield(something())
632 except CancelledError:
633 res = None
634 """
635 inner = async(arg, loop=loop)
636 if inner.done():
637 # Shortcut.
638 return inner
639 loop = inner._loop
640 outer = futures.Future(loop=loop)
641
642 def _done_callback(inner):
643 if outer.cancelled():
644 # Mark inner's result as retrieved.
645 inner.cancelled() or inner.exception()
646 return
647 if inner.cancelled():
648 outer.cancel()
649 else:
650 exc = inner.exception()
651 if exc is not None:
652 outer.set_exception(exc)
653 else:
654 outer.set_result(inner.result())
655
656 inner.add_done_callback(_done_callback)
657 return outer