blob: b52933fc6c652d85614159908940a781652b6886 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
3__all__ = ['coroutine', 'Task',
Guido van Rossum68816ef2013-12-28 08:06:40 -10004 'iscoroutinefunction', 'iscoroutine',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
6 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
Guido van Rossumde3a1362013-11-29 09:29:00 -08007 'gather', 'shield',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008 ]
9
10import collections
11import concurrent.futures
12import functools
13import inspect
14import linecache
15import traceback
16import weakref
17
18from . import events
19from . import futures
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070020from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021
22# If you set _DEBUG to true, @coroutine will wrap the resulting
23# generator objects in a CoroWrapper instance (defined below). That
24# instance will log a message when the generator is never iterated
25# over, which may happen when you forget to use "yield from" with a
26# coroutine call. Note that the value of the _DEBUG flag is taken
27# when the decorator is used, so to be of any use it must be set
28# before you define your coroutines. A downside of using this feature
29# is that tracebacks show entries for the CoroWrapper.__next__ method
30# when _DEBUG is true.
31_DEBUG = False
32
33
34class CoroWrapper:
Guido van Rossume1f55442014-01-16 11:05:23 -080035 # Wrapper for coroutine in _DEBUG mode.
36
Victor Stinnerbac77932014-01-16 01:55:29 +010037 __slots__ = ['gen', 'func', '__name__', '__doc__']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038
39 def __init__(self, gen, func):
40 assert inspect.isgenerator(gen), gen
41 self.gen = gen
42 self.func = func
43
44 def __iter__(self):
45 return self
46
47 def __next__(self):
48 return next(self.gen)
49
50 def send(self, value):
51 return self.gen.send(value)
52
53 def throw(self, exc):
54 return self.gen.throw(exc)
55
56 def close(self):
57 return self.gen.close()
58
59 def __del__(self):
60 frame = self.gen.gi_frame
61 if frame is not None and frame.f_lasti == -1:
62 func = self.func
63 code = func.__code__
64 filename = code.co_filename
65 lineno = code.co_firstlineno
Guido van Rossum2b430b82013-11-01 14:13:30 -070066 logger.error(
67 'Coroutine %r defined at %s:%s was never yielded from',
68 func.__name__, filename, lineno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070069
70
71def coroutine(func):
72 """Decorator to mark coroutines.
73
74 If the coroutine is not yielded from before it is destroyed,
75 an error message is logged.
76 """
77 if inspect.isgeneratorfunction(func):
78 coro = func
79 else:
80 @functools.wraps(func)
81 def coro(*args, **kw):
82 res = func(*args, **kw)
83 if isinstance(res, futures.Future) or inspect.isgenerator(res):
84 res = yield from res
85 return res
86
87 if not _DEBUG:
88 wrapper = coro
89 else:
90 @functools.wraps(func)
91 def wrapper(*args, **kwds):
92 w = CoroWrapper(coro(*args, **kwds), func)
93 w.__name__ = coro.__name__
94 w.__doc__ = coro.__doc__
95 return w
96
97 wrapper._is_coroutine = True # For iscoroutinefunction().
98 return wrapper
99
100
101def iscoroutinefunction(func):
102 """Return True if func is a decorated coroutine function."""
103 return getattr(func, '_is_coroutine', False)
104
105
106def iscoroutine(obj):
107 """Return True if obj is a coroutine object."""
108 return isinstance(obj, CoroWrapper) or inspect.isgenerator(obj)
109
110
111class Task(futures.Future):
112 """A coroutine wrapped in a Future."""
113
114 # An important invariant maintained while a Task not done:
115 #
116 # - Either _fut_waiter is None, and _step() is scheduled;
117 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
118 #
119 # The only transition from the latter to the former is through
120 # _wakeup(). When _fut_waiter is not None, one of its callbacks
121 # must be _wakeup().
122
123 # Weak set containing all tasks alive.
124 _all_tasks = weakref.WeakSet()
125
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800126 # Dictionary containing tasks that are currently active in
127 # all running event loops. {EventLoop: Task}
128 _current_tasks = {}
129
130 @classmethod
131 def current_task(cls, loop=None):
132 """Return the currently running task in an event loop or None.
133
134 By default the current task for the current event loop is returned.
135
136 None is returned when called not in the context of a Task.
137 """
138 if loop is None:
139 loop = events.get_event_loop()
140 return cls._current_tasks.get(loop)
141
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700142 @classmethod
143 def all_tasks(cls, loop=None):
144 """Return a set of all tasks for an event loop.
145
146 By default all tasks for the current event loop are returned.
147 """
148 if loop is None:
149 loop = events.get_event_loop()
150 return {t for t in cls._all_tasks if t._loop is loop}
151
152 def __init__(self, coro, *, loop=None):
153 assert iscoroutine(coro), repr(coro) # Not a coroutine function!
154 super().__init__(loop=loop)
155 self._coro = iter(coro) # Use the iterator just in case.
156 self._fut_waiter = None
157 self._must_cancel = False
158 self._loop.call_soon(self._step)
159 self.__class__._all_tasks.add(self)
160
161 def __repr__(self):
162 res = super().__repr__()
163 if (self._must_cancel and
164 self._state == futures._PENDING and
165 '<PENDING' in res):
166 res = res.replace('<PENDING', '<CANCELLING', 1)
167 i = res.find('<')
168 if i < 0:
169 i = len(res)
170 res = res[:i] + '(<{}>)'.format(self._coro.__name__) + res[i:]
171 return res
172
173 def get_stack(self, *, limit=None):
174 """Return the list of stack frames for this task's coroutine.
175
176 If the coroutine is active, this returns the stack where it is
177 suspended. If the coroutine has completed successfully or was
178 cancelled, this returns an empty list. If the coroutine was
179 terminated by an exception, this returns the list of traceback
180 frames.
181
182 The frames are always ordered from oldest to newest.
183
184 The optional limit gives the maximum nummber of frames to
185 return; by default all available frames are returned. Its
186 meaning differs depending on whether a stack or a traceback is
187 returned: the newest frames of a stack are returned, but the
188 oldest frames of a traceback are returned. (This matches the
189 behavior of the traceback module.)
190
191 For reasons beyond our control, only one stack frame is
192 returned for a suspended coroutine.
193 """
194 frames = []
195 f = self._coro.gi_frame
196 if f is not None:
197 while f is not None:
198 if limit is not None:
199 if limit <= 0:
200 break
201 limit -= 1
202 frames.append(f)
203 f = f.f_back
204 frames.reverse()
205 elif self._exception is not None:
206 tb = self._exception.__traceback__
207 while tb is not None:
208 if limit is not None:
209 if limit <= 0:
210 break
211 limit -= 1
212 frames.append(tb.tb_frame)
213 tb = tb.tb_next
214 return frames
215
216 def print_stack(self, *, limit=None, file=None):
217 """Print the stack or traceback for this task's coroutine.
218
219 This produces output similar to that of the traceback module,
220 for the frames retrieved by get_stack(). The limit argument
221 is passed to get_stack(). The file argument is an I/O stream
222 to which the output goes; by default it goes to sys.stderr.
223 """
224 extracted_list = []
225 checked = set()
226 for f in self.get_stack(limit=limit):
227 lineno = f.f_lineno
228 co = f.f_code
229 filename = co.co_filename
230 name = co.co_name
231 if filename not in checked:
232 checked.add(filename)
233 linecache.checkcache(filename)
234 line = linecache.getline(filename, lineno, f.f_globals)
235 extracted_list.append((filename, lineno, name, line))
236 exc = self._exception
237 if not extracted_list:
238 print('No stack for %r' % self, file=file)
239 elif exc is not None:
240 print('Traceback for %r (most recent call last):' % self,
241 file=file)
242 else:
243 print('Stack for %r (most recent call last):' % self,
244 file=file)
245 traceback.print_list(extracted_list, file=file)
246 if exc is not None:
247 for line in traceback.format_exception_only(exc.__class__, exc):
248 print(line, file=file, end='')
249
250 def cancel(self):
251 if self.done():
252 return False
253 if self._fut_waiter is not None:
254 if self._fut_waiter.cancel():
255 # Leave self._fut_waiter; it may be a Task that
256 # catches and ignores the cancellation so we may have
257 # to cancel it again later.
258 return True
259 # It must be the case that self._step is already scheduled.
260 self._must_cancel = True
261 return True
262
263 def _step(self, value=None, exc=None):
264 assert not self.done(), \
265 '_step(): already done: {!r}, {!r}, {!r}'.format(self, value, exc)
266 if self._must_cancel:
267 if not isinstance(exc, futures.CancelledError):
268 exc = futures.CancelledError()
269 self._must_cancel = False
270 coro = self._coro
271 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800272
273 self.__class__._current_tasks[self._loop] = self
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700274 # Call either coro.throw(exc) or coro.send(value).
275 try:
276 if exc is not None:
277 result = coro.throw(exc)
278 elif value is not None:
279 result = coro.send(value)
280 else:
281 result = next(coro)
282 except StopIteration as exc:
283 self.set_result(exc.value)
284 except futures.CancelledError as exc:
285 super().cancel() # I.e., Future.cancel(self).
286 except Exception as exc:
287 self.set_exception(exc)
288 except BaseException as exc:
289 self.set_exception(exc)
290 raise
291 else:
292 if isinstance(result, futures.Future):
293 # Yielded Future must come from Future.__iter__().
294 if result._blocking:
295 result._blocking = False
296 result.add_done_callback(self._wakeup)
297 self._fut_waiter = result
298 if self._must_cancel:
299 if self._fut_waiter.cancel():
300 self._must_cancel = False
301 else:
302 self._loop.call_soon(
303 self._step, None,
304 RuntimeError(
305 'yield was used instead of yield from '
306 'in task {!r} with {!r}'.format(self, result)))
307 elif result is None:
308 # Bare yield relinquishes control for one event loop iteration.
309 self._loop.call_soon(self._step)
310 elif inspect.isgenerator(result):
311 # Yielding a generator is just wrong.
312 self._loop.call_soon(
313 self._step, None,
314 RuntimeError(
315 'yield was used instead of yield from for '
316 'generator in task {!r} with {}'.format(
317 self, result)))
318 else:
319 # Yielding something else is an error.
320 self._loop.call_soon(
321 self._step, None,
322 RuntimeError(
323 'Task got bad yield: {!r}'.format(result)))
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800324 finally:
325 self.__class__._current_tasks.pop(self._loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700326 self = None
327
328 def _wakeup(self, future):
329 try:
330 value = future.result()
331 except Exception as exc:
332 # This may also be a cancellation.
333 self._step(None, exc)
334 else:
335 self._step(value, None)
336 self = None # Needed to break cycles when an exception occurs.
337
338
339# wait() and as_completed() similar to those in PEP 3148.
340
341FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
342FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
343ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
344
345
346@coroutine
347def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
348 """Wait for the Futures and coroutines given by fs to complete.
349
350 Coroutines will be wrapped in Tasks.
351
352 Returns two sets of Future: (done, pending).
353
354 Usage:
355
356 done, pending = yield from asyncio.wait(fs)
357
358 Note: This does not raise TimeoutError! Futures that aren't done
359 when the timeout occurs are returned in the second set.
360 """
361 if not fs:
362 raise ValueError('Set of coroutines/Futures is empty.')
363
364 if loop is None:
365 loop = events.get_event_loop()
366
367 fs = set(async(f, loop=loop) for f in fs)
368
369 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
370 raise ValueError('Invalid return_when value: {}'.format(return_when))
371 return (yield from _wait(fs, timeout, return_when, loop))
372
373
374def _release_waiter(waiter, value=True, *args):
375 if not waiter.done():
376 waiter.set_result(value)
377
378
379@coroutine
380def wait_for(fut, timeout, *, loop=None):
381 """Wait for the single Future or coroutine to complete, with timeout.
382
383 Coroutine will be wrapped in Task.
384
Victor Stinner421e49b2014-01-23 17:40:59 +0100385 Returns result of the Future or coroutine. When a timeout occurs,
386 it cancels the task and raises TimeoutError. To avoid the task
387 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700388
389 Usage:
390
391 result = yield from asyncio.wait_for(fut, 10.0)
392
393 """
394 if loop is None:
395 loop = events.get_event_loop()
396
397 waiter = futures.Future(loop=loop)
398 timeout_handle = loop.call_later(timeout, _release_waiter, waiter, False)
399 cb = functools.partial(_release_waiter, waiter, True)
400
401 fut = async(fut, loop=loop)
402 fut.add_done_callback(cb)
403
404 try:
405 if (yield from waiter):
406 return fut.result()
407 else:
408 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100409 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700410 raise futures.TimeoutError()
411 finally:
412 timeout_handle.cancel()
413
414
415@coroutine
416def _wait(fs, timeout, return_when, loop):
417 """Internal helper for wait() and _wait_for().
418
419 The fs argument must be a collection of Futures.
420 """
421 assert fs, 'Set of Futures is empty.'
422 waiter = futures.Future(loop=loop)
423 timeout_handle = None
424 if timeout is not None:
425 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
426 counter = len(fs)
427
428 def _on_completion(f):
429 nonlocal counter
430 counter -= 1
431 if (counter <= 0 or
432 return_when == FIRST_COMPLETED or
433 return_when == FIRST_EXCEPTION and (not f.cancelled() and
434 f.exception() is not None)):
435 if timeout_handle is not None:
436 timeout_handle.cancel()
437 if not waiter.done():
438 waiter.set_result(False)
439
440 for f in fs:
441 f.add_done_callback(_on_completion)
442
443 try:
444 yield from waiter
445 finally:
446 if timeout_handle is not None:
447 timeout_handle.cancel()
448
449 done, pending = set(), set()
450 for f in fs:
451 f.remove_done_callback(_on_completion)
452 if f.done():
453 done.add(f)
454 else:
455 pending.add(f)
456 return done, pending
457
458
459# This is *not* a @coroutine! It is just an iterator (yielding Futures).
460def as_completed(fs, *, loop=None, timeout=None):
461 """Return an iterator whose values, when waited for, are Futures.
462
463 This differs from PEP 3148; the proper way to use this is:
464
465 for f in as_completed(fs):
466 result = yield from f # The 'yield from' may raise.
467 # Use result.
468
469 Raises TimeoutError if the timeout occurs before all Futures are
470 done.
471
472 Note: The futures 'f' are not necessarily members of fs.
473 """
474 loop = loop if loop is not None else events.get_event_loop()
475 deadline = None if timeout is None else loop.time() + timeout
476 todo = set(async(f, loop=loop) for f in fs)
477 completed = collections.deque()
478
479 @coroutine
480 def _wait_for_one():
481 while not completed:
482 timeout = None
483 if deadline is not None:
484 timeout = deadline - loop.time()
485 if timeout < 0:
486 raise futures.TimeoutError()
487 done, pending = yield from _wait(
488 todo, timeout, FIRST_COMPLETED, loop)
489 # Multiple callers might be waiting for the same events
490 # and getting the same outcome. Dedupe by updating todo.
491 for f in done:
492 if f in todo:
493 todo.remove(f)
494 completed.append(f)
495 f = completed.popleft()
496 return f.result() # May raise.
497
498 for _ in range(len(todo)):
499 yield _wait_for_one()
500
501
502@coroutine
503def sleep(delay, result=None, *, loop=None):
504 """Coroutine that completes after a given time (in seconds)."""
505 future = futures.Future(loop=loop)
506 h = future._loop.call_later(delay, future.set_result, result)
507 try:
508 return (yield from future)
509 finally:
510 h.cancel()
511
512
513def async(coro_or_future, *, loop=None):
514 """Wrap a coroutine in a future.
515
516 If the argument is a Future, it is returned directly.
517 """
518 if isinstance(coro_or_future, futures.Future):
519 if loop is not None and loop is not coro_or_future._loop:
520 raise ValueError('loop argument must agree with Future')
521 return coro_or_future
522 elif iscoroutine(coro_or_future):
523 return Task(coro_or_future, loop=loop)
524 else:
525 raise TypeError('A Future or coroutine is required')
526
527
528class _GatheringFuture(futures.Future):
529 """Helper for gather().
530
531 This overrides cancel() to cancel all the children and act more
532 like Task.cancel(), which doesn't immediately mark itself as
533 cancelled.
534 """
535
536 def __init__(self, children, *, loop=None):
537 super().__init__(loop=loop)
538 self._children = children
539
540 def cancel(self):
541 if self.done():
542 return False
543 for child in self._children:
544 child.cancel()
545 return True
546
547
548def gather(*coros_or_futures, loop=None, return_exceptions=False):
549 """Return a future aggregating results from the given coroutines
550 or futures.
551
552 All futures must share the same event loop. If all the tasks are
553 done successfully, the returned future's result is the list of
554 results (in the order of the original sequence, not necessarily
555 the order of results arrival). If *result_exception* is True,
556 exceptions in the tasks are treated the same as successful
557 results, and gathered in the result list; otherwise, the first
558 raised exception will be immediately propagated to the returned
559 future.
560
561 Cancellation: if the outer Future is cancelled, all children (that
562 have not completed yet) are also cancelled. If any child is
563 cancelled, this is treated as if it raised CancelledError --
564 the outer Future is *not* cancelled in this case. (This is to
565 prevent the cancellation of one child to cause other children to
566 be cancelled.)
567 """
568 children = [async(fut, loop=loop) for fut in coros_or_futures]
569 n = len(children)
570 if n == 0:
571 outer = futures.Future(loop=loop)
572 outer.set_result([])
573 return outer
574 if loop is None:
575 loop = children[0]._loop
576 for fut in children:
577 if fut._loop is not loop:
578 raise ValueError("futures are tied to different event loops")
579 outer = _GatheringFuture(children, loop=loop)
580 nfinished = 0
581 results = [None] * n
582
583 def _done_callback(i, fut):
584 nonlocal nfinished
585 if outer._state != futures._PENDING:
586 if fut._exception is not None:
587 # Mark exception retrieved.
588 fut.exception()
589 return
590 if fut._state == futures._CANCELLED:
591 res = futures.CancelledError()
592 if not return_exceptions:
593 outer.set_exception(res)
594 return
595 elif fut._exception is not None:
596 res = fut.exception() # Mark exception retrieved.
597 if not return_exceptions:
598 outer.set_exception(res)
599 return
600 else:
601 res = fut._result
602 results[i] = res
603 nfinished += 1
604 if nfinished == n:
605 outer.set_result(results)
606
607 for i, fut in enumerate(children):
608 fut.add_done_callback(functools.partial(_done_callback, i))
609 return outer
610
611
612def shield(arg, *, loop=None):
613 """Wait for a future, shielding it from cancellation.
614
615 The statement
616
617 res = yield from shield(something())
618
619 is exactly equivalent to the statement
620
621 res = yield from something()
622
623 *except* that if the coroutine containing it is cancelled, the
624 task running in something() is not cancelled. From the POV of
625 something(), the cancellation did not happen. But its caller is
626 still cancelled, so the yield-from expression still raises
627 CancelledError. Note: If something() is cancelled by other means
628 this will still cancel shield().
629
630 If you want to completely ignore cancellation (not recommended)
631 you can combine shield() with a try/except clause, as follows:
632
633 try:
634 res = yield from shield(something())
635 except CancelledError:
636 res = None
637 """
638 inner = async(arg, loop=loop)
639 if inner.done():
640 # Shortcut.
641 return inner
642 loop = inner._loop
643 outer = futures.Future(loop=loop)
644
645 def _done_callback(inner):
646 if outer.cancelled():
647 # Mark inner's result as retrieved.
648 inner.cancelled() or inner.exception()
649 return
650 if inner.cancelled():
651 outer.cancel()
652 else:
653 exc = inner.exception()
654 if exc is not None:
655 outer.set_exception(exc)
656 else:
657 outer.set_result(inner.result())
658
659 inner.add_done_callback(_done_callback)
660 return outer