blob: 89ec3a4caa75072ebc92003a08df5d69c2d82883 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
3__all__ = ['coroutine', 'Task',
Guido van Rossum68816ef2013-12-28 08:06:40 -10004 'iscoroutinefunction', 'iscoroutine',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
6 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
Guido van Rossumde3a1362013-11-29 09:29:00 -08007 'gather', 'shield',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008 ]
9
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070010import concurrent.futures
11import functools
12import inspect
13import linecache
Victor Stinner7ef60cd2014-02-19 23:15:02 +010014import os
15import sys
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016import traceback
17import weakref
18
19from . import events
20from . import futures
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070021from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
23# If you set _DEBUG to true, @coroutine will wrap the resulting
24# generator objects in a CoroWrapper instance (defined below). That
25# instance will log a message when the generator is never iterated
26# over, which may happen when you forget to use "yield from" with a
27# coroutine call. Note that the value of the _DEBUG flag is taken
28# when the decorator is used, so to be of any use it must be set
29# before you define your coroutines. A downside of using this feature
30# is that tracebacks show entries for the CoroWrapper.__next__ method
31# when _DEBUG is true.
Victor Stinner7ef60cd2014-02-19 23:15:02 +010032_DEBUG = (not sys.flags.ignore_environment
33 and bool(os.environ.get('PYTHONASYNCIODEBUG')))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070034
Victor Stinnera02f81f2014-06-24 22:37:53 +020035_PY34 = (sys.version_info >= (3, 4))
Victor Stinner8d3e02e2014-06-18 01:14:59 +020036_PY35 = (sys.version_info >= (3, 5))
37
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038
39class CoroWrapper:
Guido van Rossume1f55442014-01-16 11:05:23 -080040 # Wrapper for coroutine in _DEBUG mode.
41
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070042 def __init__(self, gen, func):
43 assert inspect.isgenerator(gen), gen
44 self.gen = gen
45 self.func = func
Victor Stinnerbbd96c62014-06-27 12:28:41 +020046 self._source_traceback = traceback.extract_stack(sys._getframe(1))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070047
48 def __iter__(self):
49 return self
50
51 def __next__(self):
52 return next(self.gen)
53
Yury Selivanovf15f7482014-04-14 22:21:52 -040054 def send(self, *value):
55 # We use `*value` because of a bug in CPythons prior
56 # to 3.4.1. See issue #21209 and test_yield_from_corowrapper
57 # for details. This workaround should be removed in 3.5.0.
Yury Selivanov09cc1692014-04-15 12:01:16 -040058 if len(value) == 1:
59 value = value[0]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070060 return self.gen.send(value)
61
62 def throw(self, exc):
63 return self.gen.throw(exc)
64
65 def close(self):
66 return self.gen.close()
67
Guido van Rossum0cbc7682014-04-15 12:06:34 -070068 @property
69 def gi_frame(self):
70 return self.gen.gi_frame
71
72 @property
73 def gi_running(self):
74 return self.gen.gi_running
75
76 @property
77 def gi_code(self):
78 return self.gen.gi_code
79
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070080 def __del__(self):
Guido van Rossum83c1ddd2014-04-27 10:33:58 -070081 # Be careful accessing self.gen.frame -- self.gen might not exist.
82 gen = getattr(self, 'gen', None)
83 frame = getattr(gen, 'gi_frame', None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070084 if frame is not None and frame.f_lasti == -1:
Victor Stinnerbbd96c62014-06-27 12:28:41 +020085 func = events._format_callback(self.func, ())
86 tb = ''.join(traceback.format_list(self._source_traceback))
87 message = ('Coroutine %s was never yielded from\n'
88 'Coroutine object created at (most recent call last):\n'
89 '%s'
90 % (func, tb.rstrip()))
91 logger.error(message)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070092
93
94def coroutine(func):
95 """Decorator to mark coroutines.
96
97 If the coroutine is not yielded from before it is destroyed,
98 an error message is logged.
99 """
100 if inspect.isgeneratorfunction(func):
101 coro = func
102 else:
103 @functools.wraps(func)
104 def coro(*args, **kw):
105 res = func(*args, **kw)
106 if isinstance(res, futures.Future) or inspect.isgenerator(res):
107 res = yield from res
108 return res
109
110 if not _DEBUG:
111 wrapper = coro
112 else:
113 @functools.wraps(func)
114 def wrapper(*args, **kwds):
115 w = CoroWrapper(coro(*args, **kwds), func)
Victor Stinnerbbd96c62014-06-27 12:28:41 +0200116 if w._source_traceback:
117 del w._source_traceback[-1]
Victor Stinner8d3e02e2014-06-18 01:14:59 +0200118 w.__name__ = func.__name__
119 if _PY35:
120 w.__qualname__ = func.__qualname__
121 w.__doc__ = func.__doc__
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700122 return w
123
124 wrapper._is_coroutine = True # For iscoroutinefunction().
125 return wrapper
126
127
128def iscoroutinefunction(func):
129 """Return True if func is a decorated coroutine function."""
130 return getattr(func, '_is_coroutine', False)
131
132
133def iscoroutine(obj):
134 """Return True if obj is a coroutine object."""
135 return isinstance(obj, CoroWrapper) or inspect.isgenerator(obj)
136
137
Victor Stinner975735f2014-06-25 21:41:58 +0200138def _format_coroutine(coro):
139 assert iscoroutine(coro)
140 if _PY35:
141 coro_name = coro.__qualname__
142 else:
143 coro_name = coro.__name__
144
145 filename = coro.gi_code.co_filename
146 if coro.gi_frame is not None:
147 lineno = coro.gi_frame.f_lineno
148 return '%s() at %s:%s' % (coro_name, filename, lineno)
149 else:
150 lineno = coro.gi_code.co_firstlineno
151 return '%s() done at %s:%s' % (coro_name, filename, lineno)
152
153
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700154class Task(futures.Future):
155 """A coroutine wrapped in a Future."""
156
157 # An important invariant maintained while a Task not done:
158 #
159 # - Either _fut_waiter is None, and _step() is scheduled;
160 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
161 #
162 # The only transition from the latter to the former is through
163 # _wakeup(). When _fut_waiter is not None, one of its callbacks
164 # must be _wakeup().
165
166 # Weak set containing all tasks alive.
167 _all_tasks = weakref.WeakSet()
168
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800169 # Dictionary containing tasks that are currently active in
170 # all running event loops. {EventLoop: Task}
171 _current_tasks = {}
172
173 @classmethod
174 def current_task(cls, loop=None):
175 """Return the currently running task in an event loop or None.
176
177 By default the current task for the current event loop is returned.
178
179 None is returned when called not in the context of a Task.
180 """
181 if loop is None:
182 loop = events.get_event_loop()
183 return cls._current_tasks.get(loop)
184
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700185 @classmethod
186 def all_tasks(cls, loop=None):
187 """Return a set of all tasks for an event loop.
188
189 By default all tasks for the current event loop are returned.
190 """
191 if loop is None:
192 loop = events.get_event_loop()
193 return {t for t in cls._all_tasks if t._loop is loop}
194
195 def __init__(self, coro, *, loop=None):
196 assert iscoroutine(coro), repr(coro) # Not a coroutine function!
197 super().__init__(loop=loop)
198 self._coro = iter(coro) # Use the iterator just in case.
199 self._fut_waiter = None
200 self._must_cancel = False
201 self._loop.call_soon(self._step)
202 self.__class__._all_tasks.add(self)
203
Victor Stinnera02f81f2014-06-24 22:37:53 +0200204 # On Python 3.3 or older, objects with a destructor part of a reference
205 # cycle are never destroyed. It's not more the case on Python 3.4 thanks to
206 # the PEP 442.
207 if _PY34:
208 def __del__(self):
209 if self._state == futures._PENDING:
210 self._loop.call_exception_handler({
211 'task': self,
212 'message': 'Task was destroyed but it is pending!',
213 })
214 futures.Future.__del__(self)
215
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700216 def __repr__(self):
Victor Stinner975735f2014-06-25 21:41:58 +0200217 info = []
218 if self._must_cancel:
219 info.append('cancelling')
220 else:
221 info.append(self._state.lower())
222
223 info.append(_format_coroutine(self._coro))
224
225 if self._state == futures._FINISHED:
226 info.append(self._format_result())
227
228 if self._callbacks:
229 info.append(self._format_callbacks())
230
231 return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700232
233 def get_stack(self, *, limit=None):
234 """Return the list of stack frames for this task's coroutine.
235
236 If the coroutine is active, this returns the stack where it is
237 suspended. If the coroutine has completed successfully or was
238 cancelled, this returns an empty list. If the coroutine was
239 terminated by an exception, this returns the list of traceback
240 frames.
241
242 The frames are always ordered from oldest to newest.
243
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500244 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700245 return; by default all available frames are returned. Its
246 meaning differs depending on whether a stack or a traceback is
247 returned: the newest frames of a stack are returned, but the
248 oldest frames of a traceback are returned. (This matches the
249 behavior of the traceback module.)
250
251 For reasons beyond our control, only one stack frame is
252 returned for a suspended coroutine.
253 """
254 frames = []
255 f = self._coro.gi_frame
256 if f is not None:
257 while f is not None:
258 if limit is not None:
259 if limit <= 0:
260 break
261 limit -= 1
262 frames.append(f)
263 f = f.f_back
264 frames.reverse()
265 elif self._exception is not None:
266 tb = self._exception.__traceback__
267 while tb is not None:
268 if limit is not None:
269 if limit <= 0:
270 break
271 limit -= 1
272 frames.append(tb.tb_frame)
273 tb = tb.tb_next
274 return frames
275
276 def print_stack(self, *, limit=None, file=None):
277 """Print the stack or traceback for this task's coroutine.
278
279 This produces output similar to that of the traceback module,
280 for the frames retrieved by get_stack(). The limit argument
281 is passed to get_stack(). The file argument is an I/O stream
282 to which the output goes; by default it goes to sys.stderr.
283 """
284 extracted_list = []
285 checked = set()
286 for f in self.get_stack(limit=limit):
287 lineno = f.f_lineno
288 co = f.f_code
289 filename = co.co_filename
290 name = co.co_name
291 if filename not in checked:
292 checked.add(filename)
293 linecache.checkcache(filename)
294 line = linecache.getline(filename, lineno, f.f_globals)
295 extracted_list.append((filename, lineno, name, line))
296 exc = self._exception
297 if not extracted_list:
298 print('No stack for %r' % self, file=file)
299 elif exc is not None:
300 print('Traceback for %r (most recent call last):' % self,
301 file=file)
302 else:
303 print('Stack for %r (most recent call last):' % self,
304 file=file)
305 traceback.print_list(extracted_list, file=file)
306 if exc is not None:
307 for line in traceback.format_exception_only(exc.__class__, exc):
308 print(line, file=file, end='')
309
310 def cancel(self):
Victor Stinner8d213572014-06-02 23:06:46 +0200311 """Request this task to cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200312
Victor Stinner8d213572014-06-02 23:06:46 +0200313 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200314 wrapped coroutine on the next cycle through the event loop.
315 The coroutine then has a chance to clean up or even deny
316 the request using try/except/finally.
317
318 Contrary to Future.cancel(), this does not guarantee that the
319 task will be cancelled: the exception might be caught and
320 acted upon, delaying cancellation of the task or preventing it
321 completely. The task may also return a value or raise a
322 different exception.
323
324 Immediately after this method is called, Task.cancelled() will
325 not return True (unless the task was already cancelled). A
326 task will be marked as cancelled when the wrapped coroutine
327 terminates with a CancelledError exception (even if cancel()
328 was not called).
329 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700330 if self.done():
331 return False
332 if self._fut_waiter is not None:
333 if self._fut_waiter.cancel():
334 # Leave self._fut_waiter; it may be a Task that
335 # catches and ignores the cancellation so we may have
336 # to cancel it again later.
337 return True
338 # It must be the case that self._step is already scheduled.
339 self._must_cancel = True
340 return True
341
342 def _step(self, value=None, exc=None):
343 assert not self.done(), \
344 '_step(): already done: {!r}, {!r}, {!r}'.format(self, value, exc)
345 if self._must_cancel:
346 if not isinstance(exc, futures.CancelledError):
347 exc = futures.CancelledError()
348 self._must_cancel = False
349 coro = self._coro
350 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800351
352 self.__class__._current_tasks[self._loop] = self
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700353 # Call either coro.throw(exc) or coro.send(value).
354 try:
355 if exc is not None:
356 result = coro.throw(exc)
357 elif value is not None:
358 result = coro.send(value)
359 else:
360 result = next(coro)
361 except StopIteration as exc:
362 self.set_result(exc.value)
363 except futures.CancelledError as exc:
364 super().cancel() # I.e., Future.cancel(self).
365 except Exception as exc:
366 self.set_exception(exc)
367 except BaseException as exc:
368 self.set_exception(exc)
369 raise
370 else:
371 if isinstance(result, futures.Future):
372 # Yielded Future must come from Future.__iter__().
373 if result._blocking:
374 result._blocking = False
375 result.add_done_callback(self._wakeup)
376 self._fut_waiter = result
377 if self._must_cancel:
378 if self._fut_waiter.cancel():
379 self._must_cancel = False
380 else:
381 self._loop.call_soon(
382 self._step, None,
383 RuntimeError(
384 'yield was used instead of yield from '
385 'in task {!r} with {!r}'.format(self, result)))
386 elif result is None:
387 # Bare yield relinquishes control for one event loop iteration.
388 self._loop.call_soon(self._step)
389 elif inspect.isgenerator(result):
390 # Yielding a generator is just wrong.
391 self._loop.call_soon(
392 self._step, None,
393 RuntimeError(
394 'yield was used instead of yield from for '
395 'generator in task {!r} with {}'.format(
396 self, result)))
397 else:
398 # Yielding something else is an error.
399 self._loop.call_soon(
400 self._step, None,
401 RuntimeError(
402 'Task got bad yield: {!r}'.format(result)))
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800403 finally:
404 self.__class__._current_tasks.pop(self._loop)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100405 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700406
407 def _wakeup(self, future):
408 try:
409 value = future.result()
410 except Exception as exc:
411 # This may also be a cancellation.
412 self._step(None, exc)
413 else:
414 self._step(value, None)
415 self = None # Needed to break cycles when an exception occurs.
416
417
418# wait() and as_completed() similar to those in PEP 3148.
419
420FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
421FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
422ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
423
424
425@coroutine
426def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
427 """Wait for the Futures and coroutines given by fs to complete.
428
Victor Stinnerdb74d982014-06-10 11:16:05 +0200429 The sequence futures must not be empty.
430
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700431 Coroutines will be wrapped in Tasks.
432
433 Returns two sets of Future: (done, pending).
434
435 Usage:
436
437 done, pending = yield from asyncio.wait(fs)
438
439 Note: This does not raise TimeoutError! Futures that aren't done
440 when the timeout occurs are returned in the second set.
441 """
Victor Stinnereb748762014-02-11 11:54:08 +0100442 if isinstance(fs, futures.Future) or iscoroutine(fs):
443 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700444 if not fs:
445 raise ValueError('Set of coroutines/Futures is empty.')
446
447 if loop is None:
448 loop = events.get_event_loop()
449
Yury Selivanov622be342014-02-06 22:06:16 -0500450 fs = {async(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700451
452 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
453 raise ValueError('Invalid return_when value: {}'.format(return_when))
454 return (yield from _wait(fs, timeout, return_when, loop))
455
456
457def _release_waiter(waiter, value=True, *args):
458 if not waiter.done():
459 waiter.set_result(value)
460
461
462@coroutine
463def wait_for(fut, timeout, *, loop=None):
464 """Wait for the single Future or coroutine to complete, with timeout.
465
466 Coroutine will be wrapped in Task.
467
Victor Stinner421e49b2014-01-23 17:40:59 +0100468 Returns result of the Future or coroutine. When a timeout occurs,
469 it cancels the task and raises TimeoutError. To avoid the task
470 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700471
472 Usage:
473
474 result = yield from asyncio.wait_for(fut, 10.0)
475
476 """
477 if loop is None:
478 loop = events.get_event_loop()
479
Guido van Rossum48c66c32014-01-29 14:30:38 -0800480 if timeout is None:
481 return (yield from fut)
482
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700483 waiter = futures.Future(loop=loop)
484 timeout_handle = loop.call_later(timeout, _release_waiter, waiter, False)
485 cb = functools.partial(_release_waiter, waiter, True)
486
487 fut = async(fut, loop=loop)
488 fut.add_done_callback(cb)
489
490 try:
491 if (yield from waiter):
492 return fut.result()
493 else:
494 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100495 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700496 raise futures.TimeoutError()
497 finally:
498 timeout_handle.cancel()
499
500
501@coroutine
502def _wait(fs, timeout, return_when, loop):
503 """Internal helper for wait() and _wait_for().
504
505 The fs argument must be a collection of Futures.
506 """
507 assert fs, 'Set of Futures is empty.'
508 waiter = futures.Future(loop=loop)
509 timeout_handle = None
510 if timeout is not None:
511 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
512 counter = len(fs)
513
514 def _on_completion(f):
515 nonlocal counter
516 counter -= 1
517 if (counter <= 0 or
518 return_when == FIRST_COMPLETED or
519 return_when == FIRST_EXCEPTION and (not f.cancelled() and
520 f.exception() is not None)):
521 if timeout_handle is not None:
522 timeout_handle.cancel()
523 if not waiter.done():
524 waiter.set_result(False)
525
526 for f in fs:
527 f.add_done_callback(_on_completion)
528
529 try:
530 yield from waiter
531 finally:
532 if timeout_handle is not None:
533 timeout_handle.cancel()
534
535 done, pending = set(), set()
536 for f in fs:
537 f.remove_done_callback(_on_completion)
538 if f.done():
539 done.add(f)
540 else:
541 pending.add(f)
542 return done, pending
543
544
545# This is *not* a @coroutine! It is just an iterator (yielding Futures).
546def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800547 """Return an iterator whose values are coroutines.
548
549 When waiting for the yielded coroutines you'll get the results (or
550 exceptions!) of the original Futures (or coroutines), in the order
551 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700552
553 This differs from PEP 3148; the proper way to use this is:
554
555 for f in as_completed(fs):
556 result = yield from f # The 'yield from' may raise.
557 # Use result.
558
Guido van Rossumb58f0532014-02-12 17:58:19 -0800559 If a timeout is specified, the 'yield from' will raise
560 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700561
562 Note: The futures 'f' are not necessarily members of fs.
563 """
Victor Stinnereb748762014-02-11 11:54:08 +0100564 if isinstance(fs, futures.Future) or iscoroutine(fs):
565 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700566 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov622be342014-02-06 22:06:16 -0500567 todo = {async(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800568 from .queues import Queue # Import here to avoid circular import problem.
569 done = Queue(loop=loop)
570 timeout_handle = None
571
572 def _on_timeout():
573 for f in todo:
574 f.remove_done_callback(_on_completion)
575 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
576 todo.clear() # Can't do todo.remove(f) in the loop.
577
578 def _on_completion(f):
579 if not todo:
580 return # _on_timeout() was here first.
581 todo.remove(f)
582 done.put_nowait(f)
583 if not todo and timeout_handle is not None:
584 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700585
586 @coroutine
587 def _wait_for_one():
Guido van Rossumb58f0532014-02-12 17:58:19 -0800588 f = yield from done.get()
589 if f is None:
590 # Dummy value from _on_timeout().
591 raise futures.TimeoutError
592 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700593
Guido van Rossumb58f0532014-02-12 17:58:19 -0800594 for f in todo:
595 f.add_done_callback(_on_completion)
596 if todo and timeout is not None:
597 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700598 for _ in range(len(todo)):
599 yield _wait_for_one()
600
601
602@coroutine
603def sleep(delay, result=None, *, loop=None):
604 """Coroutine that completes after a given time (in seconds)."""
605 future = futures.Future(loop=loop)
606 h = future._loop.call_later(delay, future.set_result, result)
607 try:
608 return (yield from future)
609 finally:
610 h.cancel()
611
612
613def async(coro_or_future, *, loop=None):
614 """Wrap a coroutine in a future.
615
616 If the argument is a Future, it is returned directly.
617 """
618 if isinstance(coro_or_future, futures.Future):
619 if loop is not None and loop is not coro_or_future._loop:
620 raise ValueError('loop argument must agree with Future')
621 return coro_or_future
622 elif iscoroutine(coro_or_future):
623 return Task(coro_or_future, loop=loop)
624 else:
625 raise TypeError('A Future or coroutine is required')
626
627
628class _GatheringFuture(futures.Future):
629 """Helper for gather().
630
631 This overrides cancel() to cancel all the children and act more
632 like Task.cancel(), which doesn't immediately mark itself as
633 cancelled.
634 """
635
636 def __init__(self, children, *, loop=None):
637 super().__init__(loop=loop)
638 self._children = children
639
640 def cancel(self):
641 if self.done():
642 return False
643 for child in self._children:
644 child.cancel()
645 return True
646
647
648def gather(*coros_or_futures, loop=None, return_exceptions=False):
649 """Return a future aggregating results from the given coroutines
650 or futures.
651
652 All futures must share the same event loop. If all the tasks are
653 done successfully, the returned future's result is the list of
654 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500655 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700656 exceptions in the tasks are treated the same as successful
657 results, and gathered in the result list; otherwise, the first
658 raised exception will be immediately propagated to the returned
659 future.
660
661 Cancellation: if the outer Future is cancelled, all children (that
662 have not completed yet) are also cancelled. If any child is
663 cancelled, this is treated as if it raised CancelledError --
664 the outer Future is *not* cancelled in this case. (This is to
665 prevent the cancellation of one child to cause other children to
666 be cancelled.)
667 """
Yury Selivanov622be342014-02-06 22:06:16 -0500668 arg_to_fut = {arg: async(arg, loop=loop) for arg in set(coros_or_futures)}
669 children = [arg_to_fut[arg] for arg in coros_or_futures]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700670 n = len(children)
671 if n == 0:
672 outer = futures.Future(loop=loop)
673 outer.set_result([])
674 return outer
675 if loop is None:
676 loop = children[0]._loop
677 for fut in children:
678 if fut._loop is not loop:
679 raise ValueError("futures are tied to different event loops")
680 outer = _GatheringFuture(children, loop=loop)
681 nfinished = 0
682 results = [None] * n
683
684 def _done_callback(i, fut):
685 nonlocal nfinished
686 if outer._state != futures._PENDING:
687 if fut._exception is not None:
688 # Mark exception retrieved.
689 fut.exception()
690 return
691 if fut._state == futures._CANCELLED:
692 res = futures.CancelledError()
693 if not return_exceptions:
694 outer.set_exception(res)
695 return
696 elif fut._exception is not None:
697 res = fut.exception() # Mark exception retrieved.
698 if not return_exceptions:
699 outer.set_exception(res)
700 return
701 else:
702 res = fut._result
703 results[i] = res
704 nfinished += 1
705 if nfinished == n:
706 outer.set_result(results)
707
708 for i, fut in enumerate(children):
709 fut.add_done_callback(functools.partial(_done_callback, i))
710 return outer
711
712
713def shield(arg, *, loop=None):
714 """Wait for a future, shielding it from cancellation.
715
716 The statement
717
718 res = yield from shield(something())
719
720 is exactly equivalent to the statement
721
722 res = yield from something()
723
724 *except* that if the coroutine containing it is cancelled, the
725 task running in something() is not cancelled. From the POV of
726 something(), the cancellation did not happen. But its caller is
727 still cancelled, so the yield-from expression still raises
728 CancelledError. Note: If something() is cancelled by other means
729 this will still cancel shield().
730
731 If you want to completely ignore cancellation (not recommended)
732 you can combine shield() with a try/except clause, as follows:
733
734 try:
735 res = yield from shield(something())
736 except CancelledError:
737 res = None
738 """
739 inner = async(arg, loop=loop)
740 if inner.done():
741 # Shortcut.
742 return inner
743 loop = inner._loop
744 outer = futures.Future(loop=loop)
745
746 def _done_callback(inner):
747 if outer.cancelled():
748 # Mark inner's result as retrieved.
749 inner.cancelled() or inner.exception()
750 return
751 if inner.cancelled():
752 outer.cancel()
753 else:
754 exc = inner.exception()
755 if exc is not None:
756 outer.set_exception(exc)
757 else:
758 outer.set_result(inner.result())
759
760 inner.add_done_callback(_done_callback)
761 return outer