blob: db0bbf3acd8c8ebd921cf3125665fed8100dac31 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
3__all__ = ['coroutine', 'Task',
Guido van Rossum68816ef2013-12-28 08:06:40 -10004 'iscoroutinefunction', 'iscoroutine',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
6 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
Guido van Rossumde3a1362013-11-29 09:29:00 -08007 'gather', 'shield',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008 ]
9
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070010import concurrent.futures
11import functools
12import inspect
13import linecache
Victor Stinner7ef60cd2014-02-19 23:15:02 +010014import os
15import sys
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016import traceback
17import weakref
18
19from . import events
20from . import futures
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070021from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
23# If you set _DEBUG to true, @coroutine will wrap the resulting
24# generator objects in a CoroWrapper instance (defined below). That
25# instance will log a message when the generator is never iterated
26# over, which may happen when you forget to use "yield from" with a
27# coroutine call. Note that the value of the _DEBUG flag is taken
28# when the decorator is used, so to be of any use it must be set
29# before you define your coroutines. A downside of using this feature
30# is that tracebacks show entries for the CoroWrapper.__next__ method
31# when _DEBUG is true.
Victor Stinner7ef60cd2014-02-19 23:15:02 +010032_DEBUG = (not sys.flags.ignore_environment
33 and bool(os.environ.get('PYTHONASYNCIODEBUG')))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070034
Victor Stinnera02f81f2014-06-24 22:37:53 +020035_PY34 = (sys.version_info >= (3, 4))
Victor Stinner8d3e02e2014-06-18 01:14:59 +020036_PY35 = (sys.version_info >= (3, 5))
37
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038
39class CoroWrapper:
Guido van Rossume1f55442014-01-16 11:05:23 -080040 # Wrapper for coroutine in _DEBUG mode.
41
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070042 def __init__(self, gen, func):
43 assert inspect.isgenerator(gen), gen
44 self.gen = gen
45 self.func = func
Victor Stinnerbbd96c62014-06-27 12:28:41 +020046 self._source_traceback = traceback.extract_stack(sys._getframe(1))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070047
48 def __iter__(self):
49 return self
50
51 def __next__(self):
52 return next(self.gen)
53
Yury Selivanovf15f7482014-04-14 22:21:52 -040054 def send(self, *value):
55 # We use `*value` because of a bug in CPythons prior
56 # to 3.4.1. See issue #21209 and test_yield_from_corowrapper
57 # for details. This workaround should be removed in 3.5.0.
Yury Selivanov09cc1692014-04-15 12:01:16 -040058 if len(value) == 1:
59 value = value[0]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070060 return self.gen.send(value)
61
62 def throw(self, exc):
63 return self.gen.throw(exc)
64
65 def close(self):
66 return self.gen.close()
67
Guido van Rossum0cbc7682014-04-15 12:06:34 -070068 @property
69 def gi_frame(self):
70 return self.gen.gi_frame
71
72 @property
73 def gi_running(self):
74 return self.gen.gi_running
75
76 @property
77 def gi_code(self):
78 return self.gen.gi_code
79
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070080 def __del__(self):
Guido van Rossum83c1ddd2014-04-27 10:33:58 -070081 # Be careful accessing self.gen.frame -- self.gen might not exist.
82 gen = getattr(self, 'gen', None)
83 frame = getattr(gen, 'gi_frame', None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070084 if frame is not None and frame.f_lasti == -1:
Victor Stinnerbbd96c62014-06-27 12:28:41 +020085 func = events._format_callback(self.func, ())
86 tb = ''.join(traceback.format_list(self._source_traceback))
87 message = ('Coroutine %s was never yielded from\n'
88 'Coroutine object created at (most recent call last):\n'
89 '%s'
90 % (func, tb.rstrip()))
91 logger.error(message)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070092
93
94def coroutine(func):
95 """Decorator to mark coroutines.
96
97 If the coroutine is not yielded from before it is destroyed,
98 an error message is logged.
99 """
100 if inspect.isgeneratorfunction(func):
101 coro = func
102 else:
103 @functools.wraps(func)
104 def coro(*args, **kw):
105 res = func(*args, **kw)
106 if isinstance(res, futures.Future) or inspect.isgenerator(res):
107 res = yield from res
108 return res
109
110 if not _DEBUG:
111 wrapper = coro
112 else:
113 @functools.wraps(func)
114 def wrapper(*args, **kwds):
115 w = CoroWrapper(coro(*args, **kwds), func)
Victor Stinnerbbd96c62014-06-27 12:28:41 +0200116 if w._source_traceback:
117 del w._source_traceback[-1]
Victor Stinner8d3e02e2014-06-18 01:14:59 +0200118 w.__name__ = func.__name__
119 if _PY35:
120 w.__qualname__ = func.__qualname__
121 w.__doc__ = func.__doc__
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700122 return w
123
124 wrapper._is_coroutine = True # For iscoroutinefunction().
125 return wrapper
126
127
128def iscoroutinefunction(func):
129 """Return True if func is a decorated coroutine function."""
130 return getattr(func, '_is_coroutine', False)
131
132
133def iscoroutine(obj):
134 """Return True if obj is a coroutine object."""
135 return isinstance(obj, CoroWrapper) or inspect.isgenerator(obj)
136
137
Victor Stinner975735f2014-06-25 21:41:58 +0200138def _format_coroutine(coro):
139 assert iscoroutine(coro)
140 if _PY35:
141 coro_name = coro.__qualname__
142 else:
143 coro_name = coro.__name__
144
145 filename = coro.gi_code.co_filename
146 if coro.gi_frame is not None:
147 lineno = coro.gi_frame.f_lineno
148 return '%s() at %s:%s' % (coro_name, filename, lineno)
149 else:
150 lineno = coro.gi_code.co_firstlineno
151 return '%s() done at %s:%s' % (coro_name, filename, lineno)
152
153
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700154class Task(futures.Future):
155 """A coroutine wrapped in a Future."""
156
157 # An important invariant maintained while a Task not done:
158 #
159 # - Either _fut_waiter is None, and _step() is scheduled;
160 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
161 #
162 # The only transition from the latter to the former is through
163 # _wakeup(). When _fut_waiter is not None, one of its callbacks
164 # must be _wakeup().
165
166 # Weak set containing all tasks alive.
167 _all_tasks = weakref.WeakSet()
168
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800169 # Dictionary containing tasks that are currently active in
170 # all running event loops. {EventLoop: Task}
171 _current_tasks = {}
172
173 @classmethod
174 def current_task(cls, loop=None):
175 """Return the currently running task in an event loop or None.
176
177 By default the current task for the current event loop is returned.
178
179 None is returned when called not in the context of a Task.
180 """
181 if loop is None:
182 loop = events.get_event_loop()
183 return cls._current_tasks.get(loop)
184
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700185 @classmethod
186 def all_tasks(cls, loop=None):
187 """Return a set of all tasks for an event loop.
188
189 By default all tasks for the current event loop are returned.
190 """
191 if loop is None:
192 loop = events.get_event_loop()
193 return {t for t in cls._all_tasks if t._loop is loop}
194
195 def __init__(self, coro, *, loop=None):
196 assert iscoroutine(coro), repr(coro) # Not a coroutine function!
197 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200198 if self._source_traceback:
199 del self._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700200 self._coro = iter(coro) # Use the iterator just in case.
201 self._fut_waiter = None
202 self._must_cancel = False
203 self._loop.call_soon(self._step)
204 self.__class__._all_tasks.add(self)
205
Victor Stinnera02f81f2014-06-24 22:37:53 +0200206 # On Python 3.3 or older, objects with a destructor part of a reference
207 # cycle are never destroyed. It's not more the case on Python 3.4 thanks to
208 # the PEP 442.
209 if _PY34:
210 def __del__(self):
211 if self._state == futures._PENDING:
Victor Stinner80f53aa2014-06-27 13:52:20 +0200212 context = {
Victor Stinnera02f81f2014-06-24 22:37:53 +0200213 'task': self,
214 'message': 'Task was destroyed but it is pending!',
Victor Stinner80f53aa2014-06-27 13:52:20 +0200215 }
216 if self._source_traceback:
217 context['source_traceback'] = self._source_traceback
218 self._loop.call_exception_handler(context)
Victor Stinnera02f81f2014-06-24 22:37:53 +0200219 futures.Future.__del__(self)
220
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700221 def __repr__(self):
Victor Stinner975735f2014-06-25 21:41:58 +0200222 info = []
223 if self._must_cancel:
224 info.append('cancelling')
225 else:
226 info.append(self._state.lower())
227
228 info.append(_format_coroutine(self._coro))
229
230 if self._state == futures._FINISHED:
231 info.append(self._format_result())
232
233 if self._callbacks:
234 info.append(self._format_callbacks())
235
236 return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700237
238 def get_stack(self, *, limit=None):
239 """Return the list of stack frames for this task's coroutine.
240
241 If the coroutine is active, this returns the stack where it is
242 suspended. If the coroutine has completed successfully or was
243 cancelled, this returns an empty list. If the coroutine was
244 terminated by an exception, this returns the list of traceback
245 frames.
246
247 The frames are always ordered from oldest to newest.
248
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500249 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700250 return; by default all available frames are returned. Its
251 meaning differs depending on whether a stack or a traceback is
252 returned: the newest frames of a stack are returned, but the
253 oldest frames of a traceback are returned. (This matches the
254 behavior of the traceback module.)
255
256 For reasons beyond our control, only one stack frame is
257 returned for a suspended coroutine.
258 """
259 frames = []
260 f = self._coro.gi_frame
261 if f is not None:
262 while f is not None:
263 if limit is not None:
264 if limit <= 0:
265 break
266 limit -= 1
267 frames.append(f)
268 f = f.f_back
269 frames.reverse()
270 elif self._exception is not None:
271 tb = self._exception.__traceback__
272 while tb is not None:
273 if limit is not None:
274 if limit <= 0:
275 break
276 limit -= 1
277 frames.append(tb.tb_frame)
278 tb = tb.tb_next
279 return frames
280
281 def print_stack(self, *, limit=None, file=None):
282 """Print the stack or traceback for this task's coroutine.
283
284 This produces output similar to that of the traceback module,
285 for the frames retrieved by get_stack(). The limit argument
286 is passed to get_stack(). The file argument is an I/O stream
287 to which the output goes; by default it goes to sys.stderr.
288 """
289 extracted_list = []
290 checked = set()
291 for f in self.get_stack(limit=limit):
292 lineno = f.f_lineno
293 co = f.f_code
294 filename = co.co_filename
295 name = co.co_name
296 if filename not in checked:
297 checked.add(filename)
298 linecache.checkcache(filename)
299 line = linecache.getline(filename, lineno, f.f_globals)
300 extracted_list.append((filename, lineno, name, line))
301 exc = self._exception
302 if not extracted_list:
303 print('No stack for %r' % self, file=file)
304 elif exc is not None:
305 print('Traceback for %r (most recent call last):' % self,
306 file=file)
307 else:
308 print('Stack for %r (most recent call last):' % self,
309 file=file)
310 traceback.print_list(extracted_list, file=file)
311 if exc is not None:
312 for line in traceback.format_exception_only(exc.__class__, exc):
313 print(line, file=file, end='')
314
315 def cancel(self):
Victor Stinner8d213572014-06-02 23:06:46 +0200316 """Request this task to cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200317
Victor Stinner8d213572014-06-02 23:06:46 +0200318 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200319 wrapped coroutine on the next cycle through the event loop.
320 The coroutine then has a chance to clean up or even deny
321 the request using try/except/finally.
322
323 Contrary to Future.cancel(), this does not guarantee that the
324 task will be cancelled: the exception might be caught and
325 acted upon, delaying cancellation of the task or preventing it
326 completely. The task may also return a value or raise a
327 different exception.
328
329 Immediately after this method is called, Task.cancelled() will
330 not return True (unless the task was already cancelled). A
331 task will be marked as cancelled when the wrapped coroutine
332 terminates with a CancelledError exception (even if cancel()
333 was not called).
334 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700335 if self.done():
336 return False
337 if self._fut_waiter is not None:
338 if self._fut_waiter.cancel():
339 # Leave self._fut_waiter; it may be a Task that
340 # catches and ignores the cancellation so we may have
341 # to cancel it again later.
342 return True
343 # It must be the case that self._step is already scheduled.
344 self._must_cancel = True
345 return True
346
347 def _step(self, value=None, exc=None):
348 assert not self.done(), \
349 '_step(): already done: {!r}, {!r}, {!r}'.format(self, value, exc)
350 if self._must_cancel:
351 if not isinstance(exc, futures.CancelledError):
352 exc = futures.CancelledError()
353 self._must_cancel = False
354 coro = self._coro
355 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800356
357 self.__class__._current_tasks[self._loop] = self
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700358 # Call either coro.throw(exc) or coro.send(value).
359 try:
360 if exc is not None:
361 result = coro.throw(exc)
362 elif value is not None:
363 result = coro.send(value)
364 else:
365 result = next(coro)
366 except StopIteration as exc:
367 self.set_result(exc.value)
368 except futures.CancelledError as exc:
369 super().cancel() # I.e., Future.cancel(self).
370 except Exception as exc:
371 self.set_exception(exc)
372 except BaseException as exc:
373 self.set_exception(exc)
374 raise
375 else:
376 if isinstance(result, futures.Future):
377 # Yielded Future must come from Future.__iter__().
378 if result._blocking:
379 result._blocking = False
380 result.add_done_callback(self._wakeup)
381 self._fut_waiter = result
382 if self._must_cancel:
383 if self._fut_waiter.cancel():
384 self._must_cancel = False
385 else:
386 self._loop.call_soon(
387 self._step, None,
388 RuntimeError(
389 'yield was used instead of yield from '
390 'in task {!r} with {!r}'.format(self, result)))
391 elif result is None:
392 # Bare yield relinquishes control for one event loop iteration.
393 self._loop.call_soon(self._step)
394 elif inspect.isgenerator(result):
395 # Yielding a generator is just wrong.
396 self._loop.call_soon(
397 self._step, None,
398 RuntimeError(
399 'yield was used instead of yield from for '
400 'generator in task {!r} with {}'.format(
401 self, result)))
402 else:
403 # Yielding something else is an error.
404 self._loop.call_soon(
405 self._step, None,
406 RuntimeError(
407 'Task got bad yield: {!r}'.format(result)))
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800408 finally:
409 self.__class__._current_tasks.pop(self._loop)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100410 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700411
412 def _wakeup(self, future):
413 try:
414 value = future.result()
415 except Exception as exc:
416 # This may also be a cancellation.
417 self._step(None, exc)
418 else:
419 self._step(value, None)
420 self = None # Needed to break cycles when an exception occurs.
421
422
423# wait() and as_completed() similar to those in PEP 3148.
424
425FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
426FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
427ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
428
429
430@coroutine
431def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
432 """Wait for the Futures and coroutines given by fs to complete.
433
Victor Stinnerdb74d982014-06-10 11:16:05 +0200434 The sequence futures must not be empty.
435
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700436 Coroutines will be wrapped in Tasks.
437
438 Returns two sets of Future: (done, pending).
439
440 Usage:
441
442 done, pending = yield from asyncio.wait(fs)
443
444 Note: This does not raise TimeoutError! Futures that aren't done
445 when the timeout occurs are returned in the second set.
446 """
Victor Stinnereb748762014-02-11 11:54:08 +0100447 if isinstance(fs, futures.Future) or iscoroutine(fs):
448 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700449 if not fs:
450 raise ValueError('Set of coroutines/Futures is empty.')
451
452 if loop is None:
453 loop = events.get_event_loop()
454
Yury Selivanov622be342014-02-06 22:06:16 -0500455 fs = {async(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700456
457 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
458 raise ValueError('Invalid return_when value: {}'.format(return_when))
459 return (yield from _wait(fs, timeout, return_when, loop))
460
461
462def _release_waiter(waiter, value=True, *args):
463 if not waiter.done():
464 waiter.set_result(value)
465
466
467@coroutine
468def wait_for(fut, timeout, *, loop=None):
469 """Wait for the single Future or coroutine to complete, with timeout.
470
471 Coroutine will be wrapped in Task.
472
Victor Stinner421e49b2014-01-23 17:40:59 +0100473 Returns result of the Future or coroutine. When a timeout occurs,
474 it cancels the task and raises TimeoutError. To avoid the task
475 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700476
477 Usage:
478
479 result = yield from asyncio.wait_for(fut, 10.0)
480
481 """
482 if loop is None:
483 loop = events.get_event_loop()
484
Guido van Rossum48c66c32014-01-29 14:30:38 -0800485 if timeout is None:
486 return (yield from fut)
487
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700488 waiter = futures.Future(loop=loop)
489 timeout_handle = loop.call_later(timeout, _release_waiter, waiter, False)
490 cb = functools.partial(_release_waiter, waiter, True)
491
492 fut = async(fut, loop=loop)
493 fut.add_done_callback(cb)
494
495 try:
496 if (yield from waiter):
497 return fut.result()
498 else:
499 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100500 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700501 raise futures.TimeoutError()
502 finally:
503 timeout_handle.cancel()
504
505
506@coroutine
507def _wait(fs, timeout, return_when, loop):
508 """Internal helper for wait() and _wait_for().
509
510 The fs argument must be a collection of Futures.
511 """
512 assert fs, 'Set of Futures is empty.'
513 waiter = futures.Future(loop=loop)
514 timeout_handle = None
515 if timeout is not None:
516 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
517 counter = len(fs)
518
519 def _on_completion(f):
520 nonlocal counter
521 counter -= 1
522 if (counter <= 0 or
523 return_when == FIRST_COMPLETED or
524 return_when == FIRST_EXCEPTION and (not f.cancelled() and
525 f.exception() is not None)):
526 if timeout_handle is not None:
527 timeout_handle.cancel()
528 if not waiter.done():
529 waiter.set_result(False)
530
531 for f in fs:
532 f.add_done_callback(_on_completion)
533
534 try:
535 yield from waiter
536 finally:
537 if timeout_handle is not None:
538 timeout_handle.cancel()
539
540 done, pending = set(), set()
541 for f in fs:
542 f.remove_done_callback(_on_completion)
543 if f.done():
544 done.add(f)
545 else:
546 pending.add(f)
547 return done, pending
548
549
550# This is *not* a @coroutine! It is just an iterator (yielding Futures).
551def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800552 """Return an iterator whose values are coroutines.
553
554 When waiting for the yielded coroutines you'll get the results (or
555 exceptions!) of the original Futures (or coroutines), in the order
556 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700557
558 This differs from PEP 3148; the proper way to use this is:
559
560 for f in as_completed(fs):
561 result = yield from f # The 'yield from' may raise.
562 # Use result.
563
Guido van Rossumb58f0532014-02-12 17:58:19 -0800564 If a timeout is specified, the 'yield from' will raise
565 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700566
567 Note: The futures 'f' are not necessarily members of fs.
568 """
Victor Stinnereb748762014-02-11 11:54:08 +0100569 if isinstance(fs, futures.Future) or iscoroutine(fs):
570 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700571 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov622be342014-02-06 22:06:16 -0500572 todo = {async(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800573 from .queues import Queue # Import here to avoid circular import problem.
574 done = Queue(loop=loop)
575 timeout_handle = None
576
577 def _on_timeout():
578 for f in todo:
579 f.remove_done_callback(_on_completion)
580 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
581 todo.clear() # Can't do todo.remove(f) in the loop.
582
583 def _on_completion(f):
584 if not todo:
585 return # _on_timeout() was here first.
586 todo.remove(f)
587 done.put_nowait(f)
588 if not todo and timeout_handle is not None:
589 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700590
591 @coroutine
592 def _wait_for_one():
Guido van Rossumb58f0532014-02-12 17:58:19 -0800593 f = yield from done.get()
594 if f is None:
595 # Dummy value from _on_timeout().
596 raise futures.TimeoutError
597 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700598
Guido van Rossumb58f0532014-02-12 17:58:19 -0800599 for f in todo:
600 f.add_done_callback(_on_completion)
601 if todo and timeout is not None:
602 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700603 for _ in range(len(todo)):
604 yield _wait_for_one()
605
606
607@coroutine
608def sleep(delay, result=None, *, loop=None):
609 """Coroutine that completes after a given time (in seconds)."""
610 future = futures.Future(loop=loop)
611 h = future._loop.call_later(delay, future.set_result, result)
612 try:
613 return (yield from future)
614 finally:
615 h.cancel()
616
617
618def async(coro_or_future, *, loop=None):
619 """Wrap a coroutine in a future.
620
621 If the argument is a Future, it is returned directly.
622 """
623 if isinstance(coro_or_future, futures.Future):
624 if loop is not None and loop is not coro_or_future._loop:
625 raise ValueError('loop argument must agree with Future')
626 return coro_or_future
627 elif iscoroutine(coro_or_future):
Victor Stinner80f53aa2014-06-27 13:52:20 +0200628 task = Task(coro_or_future, loop=loop)
629 if task._source_traceback:
630 del task._source_traceback[-1]
631 return task
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700632 else:
633 raise TypeError('A Future or coroutine is required')
634
635
636class _GatheringFuture(futures.Future):
637 """Helper for gather().
638
639 This overrides cancel() to cancel all the children and act more
640 like Task.cancel(), which doesn't immediately mark itself as
641 cancelled.
642 """
643
644 def __init__(self, children, *, loop=None):
645 super().__init__(loop=loop)
646 self._children = children
647
648 def cancel(self):
649 if self.done():
650 return False
651 for child in self._children:
652 child.cancel()
653 return True
654
655
656def gather(*coros_or_futures, loop=None, return_exceptions=False):
657 """Return a future aggregating results from the given coroutines
658 or futures.
659
660 All futures must share the same event loop. If all the tasks are
661 done successfully, the returned future's result is the list of
662 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500663 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700664 exceptions in the tasks are treated the same as successful
665 results, and gathered in the result list; otherwise, the first
666 raised exception will be immediately propagated to the returned
667 future.
668
669 Cancellation: if the outer Future is cancelled, all children (that
670 have not completed yet) are also cancelled. If any child is
671 cancelled, this is treated as if it raised CancelledError --
672 the outer Future is *not* cancelled in this case. (This is to
673 prevent the cancellation of one child to cause other children to
674 be cancelled.)
675 """
Yury Selivanov622be342014-02-06 22:06:16 -0500676 arg_to_fut = {arg: async(arg, loop=loop) for arg in set(coros_or_futures)}
677 children = [arg_to_fut[arg] for arg in coros_or_futures]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700678 n = len(children)
679 if n == 0:
680 outer = futures.Future(loop=loop)
681 outer.set_result([])
682 return outer
683 if loop is None:
684 loop = children[0]._loop
685 for fut in children:
686 if fut._loop is not loop:
687 raise ValueError("futures are tied to different event loops")
688 outer = _GatheringFuture(children, loop=loop)
689 nfinished = 0
690 results = [None] * n
691
692 def _done_callback(i, fut):
693 nonlocal nfinished
694 if outer._state != futures._PENDING:
695 if fut._exception is not None:
696 # Mark exception retrieved.
697 fut.exception()
698 return
699 if fut._state == futures._CANCELLED:
700 res = futures.CancelledError()
701 if not return_exceptions:
702 outer.set_exception(res)
703 return
704 elif fut._exception is not None:
705 res = fut.exception() # Mark exception retrieved.
706 if not return_exceptions:
707 outer.set_exception(res)
708 return
709 else:
710 res = fut._result
711 results[i] = res
712 nfinished += 1
713 if nfinished == n:
714 outer.set_result(results)
715
716 for i, fut in enumerate(children):
717 fut.add_done_callback(functools.partial(_done_callback, i))
718 return outer
719
720
721def shield(arg, *, loop=None):
722 """Wait for a future, shielding it from cancellation.
723
724 The statement
725
726 res = yield from shield(something())
727
728 is exactly equivalent to the statement
729
730 res = yield from something()
731
732 *except* that if the coroutine containing it is cancelled, the
733 task running in something() is not cancelled. From the POV of
734 something(), the cancellation did not happen. But its caller is
735 still cancelled, so the yield-from expression still raises
736 CancelledError. Note: If something() is cancelled by other means
737 this will still cancel shield().
738
739 If you want to completely ignore cancellation (not recommended)
740 you can combine shield() with a try/except clause, as follows:
741
742 try:
743 res = yield from shield(something())
744 except CancelledError:
745 res = None
746 """
747 inner = async(arg, loop=loop)
748 if inner.done():
749 # Shortcut.
750 return inner
751 loop = inner._loop
752 outer = futures.Future(loop=loop)
753
754 def _done_callback(inner):
755 if outer.cancelled():
756 # Mark inner's result as retrieved.
757 inner.cancelled() or inner.exception()
758 return
759 if inner.cancelled():
760 outer.cancel()
761 else:
762 exc = inner.exception()
763 if exc is not None:
764 outer.set_exception(exc)
765 else:
766 outer.set_result(inner.result())
767
768 inner.add_done_callback(_done_callback)
769 return outer