blob: fcb383389ca55f46ac06395ccfaae322df845be9 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Victor Stinnerf951d282014-06-29 00:46:45 +02003__all__ = ['Task',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
5 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
Yury Selivanov59eb9a42015-05-11 14:48:38 -04006 'gather', 'shield', 'ensure_future',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007 ]
8
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009import concurrent.futures
10import functools
11import inspect
12import linecache
Victor Stinner7ef60cd2014-02-19 23:15:02 +010013import sys
Yury Selivanov1af2bf72015-05-11 22:27:25 -040014import types
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015import traceback
Yury Selivanov59eb9a42015-05-11 14:48:38 -040016import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017import weakref
18
Victor Stinnerf951d282014-06-29 00:46:45 +020019from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020from . import events
21from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020022from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023
Victor Stinnera02f81f2014-06-24 22:37:53 +020024_PY34 = (sys.version_info >= (3, 4))
Victor Stinner8d3e02e2014-06-18 01:14:59 +020025
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027class Task(futures.Future):
28 """A coroutine wrapped in a Future."""
29
30 # An important invariant maintained while a Task not done:
31 #
32 # - Either _fut_waiter is None, and _step() is scheduled;
33 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
34 #
35 # The only transition from the latter to the former is through
36 # _wakeup(). When _fut_waiter is not None, one of its callbacks
37 # must be _wakeup().
38
39 # Weak set containing all tasks alive.
40 _all_tasks = weakref.WeakSet()
41
Guido van Rossum1a605ed2013-12-06 12:57:40 -080042 # Dictionary containing tasks that are currently active in
43 # all running event loops. {EventLoop: Task}
44 _current_tasks = {}
45
Victor Stinnerfe22e092014-12-04 23:00:13 +010046 # If False, don't log a message if the task is destroyed whereas its
47 # status is still pending
48 _log_destroy_pending = True
49
Guido van Rossum1a605ed2013-12-06 12:57:40 -080050 @classmethod
51 def current_task(cls, loop=None):
52 """Return the currently running task in an event loop or None.
53
54 By default the current task for the current event loop is returned.
55
56 None is returned when called not in the context of a Task.
57 """
58 if loop is None:
59 loop = events.get_event_loop()
60 return cls._current_tasks.get(loop)
61
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070062 @classmethod
63 def all_tasks(cls, loop=None):
64 """Return a set of all tasks for an event loop.
65
66 By default all tasks for the current event loop are returned.
67 """
68 if loop is None:
69 loop = events.get_event_loop()
70 return {t for t in cls._all_tasks if t._loop is loop}
71
72 def __init__(self, coro, *, loop=None):
Victor Stinner15cc6782015-01-09 00:09:10 +010073 assert coroutines.iscoroutine(coro), repr(coro)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +020075 if self._source_traceback:
76 del self._source_traceback[-1]
Yury Selivanov1af2bf72015-05-11 22:27:25 -040077 if coro.__class__ is types.GeneratorType:
78 self._coro = coro
79 else:
80 self._coro = iter(coro) # Use the iterator just in case.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070081 self._fut_waiter = None
82 self._must_cancel = False
83 self._loop.call_soon(self._step)
84 self.__class__._all_tasks.add(self)
85
R David Murray8e069d52014-09-24 13:13:45 -040086 # On Python 3.3 or older, objects with a destructor that are part of a
87 # reference cycle are never destroyed. That's not the case any more on
88 # Python 3.4 thanks to the PEP 442.
Victor Stinnera02f81f2014-06-24 22:37:53 +020089 if _PY34:
90 def __del__(self):
Victor Stinner98b63912014-06-30 14:51:04 +020091 if self._state == futures._PENDING and self._log_destroy_pending:
Victor Stinner80f53aa2014-06-27 13:52:20 +020092 context = {
Victor Stinnera02f81f2014-06-24 22:37:53 +020093 'task': self,
94 'message': 'Task was destroyed but it is pending!',
Victor Stinner80f53aa2014-06-27 13:52:20 +020095 }
96 if self._source_traceback:
97 context['source_traceback'] = self._source_traceback
98 self._loop.call_exception_handler(context)
Victor Stinnera02f81f2014-06-24 22:37:53 +020099 futures.Future.__del__(self)
100
Victor Stinner313a9802014-07-29 12:58:23 +0200101 def _repr_info(self):
102 info = super()._repr_info()
103
Victor Stinner975735f2014-06-25 21:41:58 +0200104 if self._must_cancel:
Victor Stinner313a9802014-07-29 12:58:23 +0200105 # replace status
106 info[0] = 'cancelling'
Victor Stinner975735f2014-06-25 21:41:58 +0200107
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200108 coro = coroutines._format_coroutine(self._coro)
Victor Stinner313a9802014-07-29 12:58:23 +0200109 info.insert(1, 'coro=<%s>' % coro)
Victor Stinner975735f2014-06-25 21:41:58 +0200110
Victor Stinner2dba23a2014-07-03 00:59:00 +0200111 if self._fut_waiter is not None:
Victor Stinner313a9802014-07-29 12:58:23 +0200112 info.insert(2, 'wait_for=%r' % self._fut_waiter)
113 return info
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700114
115 def get_stack(self, *, limit=None):
116 """Return the list of stack frames for this task's coroutine.
117
Victor Stinnerd87de832014-12-02 17:57:04 +0100118 If the coroutine is not done, this returns the stack where it is
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700119 suspended. If the coroutine has completed successfully or was
120 cancelled, this returns an empty list. If the coroutine was
121 terminated by an exception, this returns the list of traceback
122 frames.
123
124 The frames are always ordered from oldest to newest.
125
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500126 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700127 return; by default all available frames are returned. Its
128 meaning differs depending on whether a stack or a traceback is
129 returned: the newest frames of a stack are returned, but the
130 oldest frames of a traceback are returned. (This matches the
131 behavior of the traceback module.)
132
133 For reasons beyond our control, only one stack frame is
134 returned for a suspended coroutine.
135 """
136 frames = []
137 f = self._coro.gi_frame
138 if f is not None:
139 while f is not None:
140 if limit is not None:
141 if limit <= 0:
142 break
143 limit -= 1
144 frames.append(f)
145 f = f.f_back
146 frames.reverse()
147 elif self._exception is not None:
148 tb = self._exception.__traceback__
149 while tb is not None:
150 if limit is not None:
151 if limit <= 0:
152 break
153 limit -= 1
154 frames.append(tb.tb_frame)
155 tb = tb.tb_next
156 return frames
157
158 def print_stack(self, *, limit=None, file=None):
159 """Print the stack or traceback for this task's coroutine.
160
161 This produces output similar to that of the traceback module,
162 for the frames retrieved by get_stack(). The limit argument
163 is passed to get_stack(). The file argument is an I/O stream
R David Murray8e069d52014-09-24 13:13:45 -0400164 to which the output is written; by default output is written
165 to sys.stderr.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700166 """
167 extracted_list = []
168 checked = set()
169 for f in self.get_stack(limit=limit):
170 lineno = f.f_lineno
171 co = f.f_code
172 filename = co.co_filename
173 name = co.co_name
174 if filename not in checked:
175 checked.add(filename)
176 linecache.checkcache(filename)
177 line = linecache.getline(filename, lineno, f.f_globals)
178 extracted_list.append((filename, lineno, name, line))
179 exc = self._exception
180 if not extracted_list:
181 print('No stack for %r' % self, file=file)
182 elif exc is not None:
183 print('Traceback for %r (most recent call last):' % self,
184 file=file)
185 else:
186 print('Stack for %r (most recent call last):' % self,
187 file=file)
188 traceback.print_list(extracted_list, file=file)
189 if exc is not None:
190 for line in traceback.format_exception_only(exc.__class__, exc):
191 print(line, file=file, end='')
192
193 def cancel(self):
R David Murray8e069d52014-09-24 13:13:45 -0400194 """Request that this task cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200195
Victor Stinner8d213572014-06-02 23:06:46 +0200196 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200197 wrapped coroutine on the next cycle through the event loop.
198 The coroutine then has a chance to clean up or even deny
199 the request using try/except/finally.
200
R David Murray8e069d52014-09-24 13:13:45 -0400201 Unlike Future.cancel, this does not guarantee that the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200202 task will be cancelled: the exception might be caught and
R David Murray8e069d52014-09-24 13:13:45 -0400203 acted upon, delaying cancellation of the task or preventing
204 cancellation completely. The task may also return a value or
205 raise a different exception.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200206
207 Immediately after this method is called, Task.cancelled() will
208 not return True (unless the task was already cancelled). A
209 task will be marked as cancelled when the wrapped coroutine
210 terminates with a CancelledError exception (even if cancel()
211 was not called).
212 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700213 if self.done():
214 return False
215 if self._fut_waiter is not None:
216 if self._fut_waiter.cancel():
217 # Leave self._fut_waiter; it may be a Task that
218 # catches and ignores the cancellation so we may have
219 # to cancel it again later.
220 return True
221 # It must be the case that self._step is already scheduled.
222 self._must_cancel = True
223 return True
224
225 def _step(self, value=None, exc=None):
226 assert not self.done(), \
227 '_step(): already done: {!r}, {!r}, {!r}'.format(self, value, exc)
228 if self._must_cancel:
229 if not isinstance(exc, futures.CancelledError):
230 exc = futures.CancelledError()
231 self._must_cancel = False
232 coro = self._coro
233 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800234
235 self.__class__._current_tasks[self._loop] = self
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700236 # Call either coro.throw(exc) or coro.send(value).
237 try:
238 if exc is not None:
239 result = coro.throw(exc)
240 elif value is not None:
241 result = coro.send(value)
242 else:
Yury Selivanov1af2bf72015-05-11 22:27:25 -0400243 result = coro.send(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700244 except StopIteration as exc:
245 self.set_result(exc.value)
246 except futures.CancelledError as exc:
247 super().cancel() # I.e., Future.cancel(self).
248 except Exception as exc:
249 self.set_exception(exc)
250 except BaseException as exc:
251 self.set_exception(exc)
252 raise
253 else:
254 if isinstance(result, futures.Future):
255 # Yielded Future must come from Future.__iter__().
256 if result._blocking:
257 result._blocking = False
258 result.add_done_callback(self._wakeup)
259 self._fut_waiter = result
260 if self._must_cancel:
261 if self._fut_waiter.cancel():
262 self._must_cancel = False
263 else:
264 self._loop.call_soon(
265 self._step, None,
266 RuntimeError(
267 'yield was used instead of yield from '
268 'in task {!r} with {!r}'.format(self, result)))
269 elif result is None:
270 # Bare yield relinquishes control for one event loop iteration.
271 self._loop.call_soon(self._step)
272 elif inspect.isgenerator(result):
273 # Yielding a generator is just wrong.
274 self._loop.call_soon(
275 self._step, None,
276 RuntimeError(
277 'yield was used instead of yield from for '
278 'generator in task {!r} with {}'.format(
279 self, result)))
280 else:
281 # Yielding something else is an error.
282 self._loop.call_soon(
283 self._step, None,
284 RuntimeError(
285 'Task got bad yield: {!r}'.format(result)))
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800286 finally:
287 self.__class__._current_tasks.pop(self._loop)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100288 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700289
290 def _wakeup(self, future):
291 try:
292 value = future.result()
293 except Exception as exc:
294 # This may also be a cancellation.
295 self._step(None, exc)
296 else:
297 self._step(value, None)
298 self = None # Needed to break cycles when an exception occurs.
299
300
301# wait() and as_completed() similar to those in PEP 3148.
302
303FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
304FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
305ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
306
307
308@coroutine
309def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
310 """Wait for the Futures and coroutines given by fs to complete.
311
Victor Stinnerdb74d982014-06-10 11:16:05 +0200312 The sequence futures must not be empty.
313
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700314 Coroutines will be wrapped in Tasks.
315
316 Returns two sets of Future: (done, pending).
317
318 Usage:
319
320 done, pending = yield from asyncio.wait(fs)
321
322 Note: This does not raise TimeoutError! Futures that aren't done
323 when the timeout occurs are returned in the second set.
324 """
Victor Stinnerf951d282014-06-29 00:46:45 +0200325 if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100326 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700327 if not fs:
328 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200329 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
330 raise ValueError('Invalid return_when value: {}'.format(return_when))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700331
332 if loop is None:
333 loop = events.get_event_loop()
334
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400335 fs = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700336
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700337 return (yield from _wait(fs, timeout, return_when, loop))
338
339
Victor Stinner59e08022014-08-28 11:19:25 +0200340def _release_waiter(waiter, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700341 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200342 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700343
344
345@coroutine
346def wait_for(fut, timeout, *, loop=None):
347 """Wait for the single Future or coroutine to complete, with timeout.
348
349 Coroutine will be wrapped in Task.
350
Victor Stinner421e49b2014-01-23 17:40:59 +0100351 Returns result of the Future or coroutine. When a timeout occurs,
352 it cancels the task and raises TimeoutError. To avoid the task
353 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700354
Victor Stinner922bc2c2015-01-15 16:29:10 +0100355 If the wait is cancelled, the task is also cancelled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700356
Victor Stinner922bc2c2015-01-15 16:29:10 +0100357 This function is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700358 """
359 if loop is None:
360 loop = events.get_event_loop()
361
Guido van Rossum48c66c32014-01-29 14:30:38 -0800362 if timeout is None:
363 return (yield from fut)
364
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700365 waiter = futures.Future(loop=loop)
Victor Stinner59e08022014-08-28 11:19:25 +0200366 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
367 cb = functools.partial(_release_waiter, waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700368
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400369 fut = ensure_future(fut, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700370 fut.add_done_callback(cb)
371
372 try:
Victor Stinner59e08022014-08-28 11:19:25 +0200373 # wait until the future completes or the timeout
Victor Stinner922bc2c2015-01-15 16:29:10 +0100374 try:
375 yield from waiter
376 except futures.CancelledError:
377 fut.remove_done_callback(cb)
378 fut.cancel()
379 raise
Victor Stinner59e08022014-08-28 11:19:25 +0200380
381 if fut.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700382 return fut.result()
383 else:
384 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100385 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700386 raise futures.TimeoutError()
387 finally:
388 timeout_handle.cancel()
389
390
391@coroutine
392def _wait(fs, timeout, return_when, loop):
393 """Internal helper for wait() and _wait_for().
394
395 The fs argument must be a collection of Futures.
396 """
397 assert fs, 'Set of Futures is empty.'
398 waiter = futures.Future(loop=loop)
399 timeout_handle = None
400 if timeout is not None:
401 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
402 counter = len(fs)
403
404 def _on_completion(f):
405 nonlocal counter
406 counter -= 1
407 if (counter <= 0 or
408 return_when == FIRST_COMPLETED or
409 return_when == FIRST_EXCEPTION and (not f.cancelled() and
410 f.exception() is not None)):
411 if timeout_handle is not None:
412 timeout_handle.cancel()
413 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200414 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700415
416 for f in fs:
417 f.add_done_callback(_on_completion)
418
419 try:
420 yield from waiter
421 finally:
422 if timeout_handle is not None:
423 timeout_handle.cancel()
424
425 done, pending = set(), set()
426 for f in fs:
427 f.remove_done_callback(_on_completion)
428 if f.done():
429 done.add(f)
430 else:
431 pending.add(f)
432 return done, pending
433
434
435# This is *not* a @coroutine! It is just an iterator (yielding Futures).
436def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800437 """Return an iterator whose values are coroutines.
438
439 When waiting for the yielded coroutines you'll get the results (or
440 exceptions!) of the original Futures (or coroutines), in the order
441 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700442
443 This differs from PEP 3148; the proper way to use this is:
444
445 for f in as_completed(fs):
446 result = yield from f # The 'yield from' may raise.
447 # Use result.
448
Guido van Rossumb58f0532014-02-12 17:58:19 -0800449 If a timeout is specified, the 'yield from' will raise
450 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700451
452 Note: The futures 'f' are not necessarily members of fs.
453 """
Victor Stinnerf951d282014-06-29 00:46:45 +0200454 if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100455 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700456 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400457 todo = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800458 from .queues import Queue # Import here to avoid circular import problem.
459 done = Queue(loop=loop)
460 timeout_handle = None
461
462 def _on_timeout():
463 for f in todo:
464 f.remove_done_callback(_on_completion)
465 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
466 todo.clear() # Can't do todo.remove(f) in the loop.
467
468 def _on_completion(f):
469 if not todo:
470 return # _on_timeout() was here first.
471 todo.remove(f)
472 done.put_nowait(f)
473 if not todo and timeout_handle is not None:
474 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700475
476 @coroutine
477 def _wait_for_one():
Guido van Rossumb58f0532014-02-12 17:58:19 -0800478 f = yield from done.get()
479 if f is None:
480 # Dummy value from _on_timeout().
481 raise futures.TimeoutError
482 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700483
Guido van Rossumb58f0532014-02-12 17:58:19 -0800484 for f in todo:
485 f.add_done_callback(_on_completion)
486 if todo and timeout is not None:
487 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700488 for _ in range(len(todo)):
489 yield _wait_for_one()
490
491
492@coroutine
493def sleep(delay, result=None, *, loop=None):
494 """Coroutine that completes after a given time (in seconds)."""
495 future = futures.Future(loop=loop)
Victor Stinnera9acbe82014-07-05 15:29:41 +0200496 h = future._loop.call_later(delay,
497 future._set_result_unless_cancelled, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700498 try:
499 return (yield from future)
500 finally:
501 h.cancel()
502
503
504def async(coro_or_future, *, loop=None):
505 """Wrap a coroutine in a future.
506
507 If the argument is a Future, it is returned directly.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400508
509 This function is deprecated in 3.5. Use asyncio.ensure_future() instead.
510 """
511
512 warnings.warn("asyncio.async() function is deprecated, use ensure_future()",
513 DeprecationWarning)
514
515 return ensure_future(coro_or_future, loop=loop)
516
517
518def ensure_future(coro_or_future, *, loop=None):
519 """Wrap a coroutine in a future.
520
521 If the argument is a Future, it is returned directly.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700522 """
523 if isinstance(coro_or_future, futures.Future):
524 if loop is not None and loop is not coro_or_future._loop:
525 raise ValueError('loop argument must agree with Future')
526 return coro_or_future
Victor Stinnerf951d282014-06-29 00:46:45 +0200527 elif coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200528 if loop is None:
529 loop = events.get_event_loop()
530 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200531 if task._source_traceback:
532 del task._source_traceback[-1]
533 return task
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700534 else:
535 raise TypeError('A Future or coroutine is required')
536
537
538class _GatheringFuture(futures.Future):
539 """Helper for gather().
540
541 This overrides cancel() to cancel all the children and act more
542 like Task.cancel(), which doesn't immediately mark itself as
543 cancelled.
544 """
545
546 def __init__(self, children, *, loop=None):
547 super().__init__(loop=loop)
548 self._children = children
549
550 def cancel(self):
551 if self.done():
552 return False
553 for child in self._children:
554 child.cancel()
555 return True
556
557
558def gather(*coros_or_futures, loop=None, return_exceptions=False):
559 """Return a future aggregating results from the given coroutines
560 or futures.
561
562 All futures must share the same event loop. If all the tasks are
563 done successfully, the returned future's result is the list of
564 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500565 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700566 exceptions in the tasks are treated the same as successful
567 results, and gathered in the result list; otherwise, the first
568 raised exception will be immediately propagated to the returned
569 future.
570
571 Cancellation: if the outer Future is cancelled, all children (that
572 have not completed yet) are also cancelled. If any child is
573 cancelled, this is treated as if it raised CancelledError --
574 the outer Future is *not* cancelled in this case. (This is to
575 prevent the cancellation of one child to cause other children to
576 be cancelled.)
577 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200578 if not coros_or_futures:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700579 outer = futures.Future(loop=loop)
580 outer.set_result([])
581 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200582
583 arg_to_fut = {}
584 for arg in set(coros_or_futures):
585 if not isinstance(arg, futures.Future):
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400586 fut = ensure_future(arg, loop=loop)
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200587 if loop is None:
588 loop = fut._loop
589 # The caller cannot control this future, the "destroy pending task"
590 # warning should not be emitted.
591 fut._log_destroy_pending = False
592 else:
593 fut = arg
594 if loop is None:
595 loop = fut._loop
596 elif fut._loop is not loop:
597 raise ValueError("futures are tied to different event loops")
598 arg_to_fut[arg] = fut
599
600 children = [arg_to_fut[arg] for arg in coros_or_futures]
601 nchildren = len(children)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700602 outer = _GatheringFuture(children, loop=loop)
603 nfinished = 0
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200604 results = [None] * nchildren
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700605
606 def _done_callback(i, fut):
607 nonlocal nfinished
Victor Stinner3531d902015-01-09 01:42:52 +0100608 if outer.done():
609 if not fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700610 # Mark exception retrieved.
611 fut.exception()
612 return
Victor Stinner3531d902015-01-09 01:42:52 +0100613
Victor Stinner29342622015-01-29 14:15:19 +0100614 if fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700615 res = futures.CancelledError()
616 if not return_exceptions:
617 outer.set_exception(res)
618 return
619 elif fut._exception is not None:
620 res = fut.exception() # Mark exception retrieved.
621 if not return_exceptions:
622 outer.set_exception(res)
623 return
624 else:
625 res = fut._result
626 results[i] = res
627 nfinished += 1
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200628 if nfinished == nchildren:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700629 outer.set_result(results)
630
631 for i, fut in enumerate(children):
632 fut.add_done_callback(functools.partial(_done_callback, i))
633 return outer
634
635
636def shield(arg, *, loop=None):
637 """Wait for a future, shielding it from cancellation.
638
639 The statement
640
641 res = yield from shield(something())
642
643 is exactly equivalent to the statement
644
645 res = yield from something()
646
647 *except* that if the coroutine containing it is cancelled, the
648 task running in something() is not cancelled. From the POV of
649 something(), the cancellation did not happen. But its caller is
650 still cancelled, so the yield-from expression still raises
651 CancelledError. Note: If something() is cancelled by other means
652 this will still cancel shield().
653
654 If you want to completely ignore cancellation (not recommended)
655 you can combine shield() with a try/except clause, as follows:
656
657 try:
658 res = yield from shield(something())
659 except CancelledError:
660 res = None
661 """
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400662 inner = ensure_future(arg, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700663 if inner.done():
664 # Shortcut.
665 return inner
666 loop = inner._loop
667 outer = futures.Future(loop=loop)
668
669 def _done_callback(inner):
670 if outer.cancelled():
Victor Stinner3531d902015-01-09 01:42:52 +0100671 if not inner.cancelled():
672 # Mark inner's result as retrieved.
673 inner.exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700674 return
Victor Stinner3531d902015-01-09 01:42:52 +0100675
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700676 if inner.cancelled():
677 outer.cancel()
678 else:
679 exc = inner.exception()
680 if exc is not None:
681 outer.set_exception(exc)
682 else:
683 outer.set_result(inner.result())
684
685 inner.add_done_callback(_done_callback)
686 return outer