blob: 9bfc1cf81479c68b271ab81a1812436e9201401f [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Victor Stinnerf951d282014-06-29 00:46:45 +02003__all__ = ['Task',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
5 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
Yury Selivanov59eb9a42015-05-11 14:48:38 -04006 'gather', 'shield', 'ensure_future',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007 ]
8
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009import concurrent.futures
10import functools
11import inspect
12import linecache
13import traceback
Yury Selivanov59eb9a42015-05-11 14:48:38 -040014import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015import weakref
16
Victor Stinner71080fc2015-07-25 02:23:21 +020017from . import compat
Victor Stinnerf951d282014-06-29 00:46:45 +020018from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019from . import events
20from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020021from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070024class Task(futures.Future):
25 """A coroutine wrapped in a Future."""
26
27 # An important invariant maintained while a Task not done:
28 #
29 # - Either _fut_waiter is None, and _step() is scheduled;
30 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
31 #
32 # The only transition from the latter to the former is through
33 # _wakeup(). When _fut_waiter is not None, one of its callbacks
34 # must be _wakeup().
35
36 # Weak set containing all tasks alive.
37 _all_tasks = weakref.WeakSet()
38
Guido van Rossum1a605ed2013-12-06 12:57:40 -080039 # Dictionary containing tasks that are currently active in
40 # all running event loops. {EventLoop: Task}
41 _current_tasks = {}
42
Victor Stinnerfe22e092014-12-04 23:00:13 +010043 # If False, don't log a message if the task is destroyed whereas its
44 # status is still pending
45 _log_destroy_pending = True
46
Guido van Rossum1a605ed2013-12-06 12:57:40 -080047 @classmethod
48 def current_task(cls, loop=None):
49 """Return the currently running task in an event loop or None.
50
51 By default the current task for the current event loop is returned.
52
53 None is returned when called not in the context of a Task.
54 """
55 if loop is None:
56 loop = events.get_event_loop()
57 return cls._current_tasks.get(loop)
58
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070059 @classmethod
60 def all_tasks(cls, loop=None):
61 """Return a set of all tasks for an event loop.
62
63 By default all tasks for the current event loop are returned.
64 """
65 if loop is None:
66 loop = events.get_event_loop()
67 return {t for t in cls._all_tasks if t._loop is loop}
68
69 def __init__(self, coro, *, loop=None):
Victor Stinner15cc6782015-01-09 00:09:10 +010070 assert coroutines.iscoroutine(coro), repr(coro)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070071 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +020072 if self._source_traceback:
73 del self._source_traceback[-1]
Yury Selivanov1ad08a52015-05-28 10:52:19 -040074 self._coro = coro
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070075 self._fut_waiter = None
76 self._must_cancel = False
77 self._loop.call_soon(self._step)
78 self.__class__._all_tasks.add(self)
79
R David Murray8e069d52014-09-24 13:13:45 -040080 # On Python 3.3 or older, objects with a destructor that are part of a
81 # reference cycle are never destroyed. That's not the case any more on
82 # Python 3.4 thanks to the PEP 442.
Victor Stinner71080fc2015-07-25 02:23:21 +020083 if compat.PY34:
Victor Stinnera02f81f2014-06-24 22:37:53 +020084 def __del__(self):
Victor Stinner98b63912014-06-30 14:51:04 +020085 if self._state == futures._PENDING and self._log_destroy_pending:
Victor Stinner80f53aa2014-06-27 13:52:20 +020086 context = {
Victor Stinnera02f81f2014-06-24 22:37:53 +020087 'task': self,
88 'message': 'Task was destroyed but it is pending!',
Victor Stinner80f53aa2014-06-27 13:52:20 +020089 }
90 if self._source_traceback:
91 context['source_traceback'] = self._source_traceback
92 self._loop.call_exception_handler(context)
Victor Stinnera02f81f2014-06-24 22:37:53 +020093 futures.Future.__del__(self)
94
Victor Stinner313a9802014-07-29 12:58:23 +020095 def _repr_info(self):
96 info = super()._repr_info()
97
Victor Stinner975735f2014-06-25 21:41:58 +020098 if self._must_cancel:
Victor Stinner313a9802014-07-29 12:58:23 +020099 # replace status
100 info[0] = 'cancelling'
Victor Stinner975735f2014-06-25 21:41:58 +0200101
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200102 coro = coroutines._format_coroutine(self._coro)
Victor Stinner313a9802014-07-29 12:58:23 +0200103 info.insert(1, 'coro=<%s>' % coro)
Victor Stinner975735f2014-06-25 21:41:58 +0200104
Victor Stinner2dba23a2014-07-03 00:59:00 +0200105 if self._fut_waiter is not None:
Victor Stinner313a9802014-07-29 12:58:23 +0200106 info.insert(2, 'wait_for=%r' % self._fut_waiter)
107 return info
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700108
109 def get_stack(self, *, limit=None):
110 """Return the list of stack frames for this task's coroutine.
111
Victor Stinnerd87de832014-12-02 17:57:04 +0100112 If the coroutine is not done, this returns the stack where it is
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700113 suspended. If the coroutine has completed successfully or was
114 cancelled, this returns an empty list. If the coroutine was
115 terminated by an exception, this returns the list of traceback
116 frames.
117
118 The frames are always ordered from oldest to newest.
119
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500120 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700121 return; by default all available frames are returned. Its
122 meaning differs depending on whether a stack or a traceback is
123 returned: the newest frames of a stack are returned, but the
124 oldest frames of a traceback are returned. (This matches the
125 behavior of the traceback module.)
126
127 For reasons beyond our control, only one stack frame is
128 returned for a suspended coroutine.
129 """
130 frames = []
131 f = self._coro.gi_frame
132 if f is not None:
133 while f is not None:
134 if limit is not None:
135 if limit <= 0:
136 break
137 limit -= 1
138 frames.append(f)
139 f = f.f_back
140 frames.reverse()
141 elif self._exception is not None:
142 tb = self._exception.__traceback__
143 while tb is not None:
144 if limit is not None:
145 if limit <= 0:
146 break
147 limit -= 1
148 frames.append(tb.tb_frame)
149 tb = tb.tb_next
150 return frames
151
152 def print_stack(self, *, limit=None, file=None):
153 """Print the stack or traceback for this task's coroutine.
154
155 This produces output similar to that of the traceback module,
156 for the frames retrieved by get_stack(). The limit argument
157 is passed to get_stack(). The file argument is an I/O stream
R David Murray8e069d52014-09-24 13:13:45 -0400158 to which the output is written; by default output is written
159 to sys.stderr.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700160 """
161 extracted_list = []
162 checked = set()
163 for f in self.get_stack(limit=limit):
164 lineno = f.f_lineno
165 co = f.f_code
166 filename = co.co_filename
167 name = co.co_name
168 if filename not in checked:
169 checked.add(filename)
170 linecache.checkcache(filename)
171 line = linecache.getline(filename, lineno, f.f_globals)
172 extracted_list.append((filename, lineno, name, line))
173 exc = self._exception
174 if not extracted_list:
175 print('No stack for %r' % self, file=file)
176 elif exc is not None:
177 print('Traceback for %r (most recent call last):' % self,
178 file=file)
179 else:
180 print('Stack for %r (most recent call last):' % self,
181 file=file)
182 traceback.print_list(extracted_list, file=file)
183 if exc is not None:
184 for line in traceback.format_exception_only(exc.__class__, exc):
185 print(line, file=file, end='')
186
187 def cancel(self):
R David Murray8e069d52014-09-24 13:13:45 -0400188 """Request that this task cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200189
Victor Stinner8d213572014-06-02 23:06:46 +0200190 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200191 wrapped coroutine on the next cycle through the event loop.
192 The coroutine then has a chance to clean up or even deny
193 the request using try/except/finally.
194
R David Murray8e069d52014-09-24 13:13:45 -0400195 Unlike Future.cancel, this does not guarantee that the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200196 task will be cancelled: the exception might be caught and
R David Murray8e069d52014-09-24 13:13:45 -0400197 acted upon, delaying cancellation of the task or preventing
198 cancellation completely. The task may also return a value or
199 raise a different exception.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200200
201 Immediately after this method is called, Task.cancelled() will
202 not return True (unless the task was already cancelled). A
203 task will be marked as cancelled when the wrapped coroutine
204 terminates with a CancelledError exception (even if cancel()
205 was not called).
206 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700207 if self.done():
208 return False
209 if self._fut_waiter is not None:
210 if self._fut_waiter.cancel():
211 # Leave self._fut_waiter; it may be a Task that
212 # catches and ignores the cancellation so we may have
213 # to cancel it again later.
214 return True
215 # It must be the case that self._step is already scheduled.
216 self._must_cancel = True
217 return True
218
219 def _step(self, value=None, exc=None):
220 assert not self.done(), \
221 '_step(): already done: {!r}, {!r}, {!r}'.format(self, value, exc)
222 if self._must_cancel:
223 if not isinstance(exc, futures.CancelledError):
224 exc = futures.CancelledError()
225 self._must_cancel = False
226 coro = self._coro
227 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800228
229 self.__class__._current_tasks[self._loop] = self
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700230 # Call either coro.throw(exc) or coro.send(value).
231 try:
232 if exc is not None:
233 result = coro.throw(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700234 else:
Yury Selivanov1ad08a52015-05-28 10:52:19 -0400235 result = coro.send(value)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700236 except StopIteration as exc:
237 self.set_result(exc.value)
238 except futures.CancelledError as exc:
239 super().cancel() # I.e., Future.cancel(self).
240 except Exception as exc:
241 self.set_exception(exc)
242 except BaseException as exc:
243 self.set_exception(exc)
244 raise
245 else:
246 if isinstance(result, futures.Future):
247 # Yielded Future must come from Future.__iter__().
248 if result._blocking:
249 result._blocking = False
250 result.add_done_callback(self._wakeup)
251 self._fut_waiter = result
Yury Selivanov4c0a09a2015-08-02 16:49:31 -0400252 if self._must_cancel:
253 if self._fut_waiter.cancel():
254 self._must_cancel = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700255 else:
256 self._loop.call_soon(
257 self._step, None,
258 RuntimeError(
259 'yield was used instead of yield from '
260 'in task {!r} with {!r}'.format(self, result)))
261 elif result is None:
262 # Bare yield relinquishes control for one event loop iteration.
263 self._loop.call_soon(self._step)
264 elif inspect.isgenerator(result):
265 # Yielding a generator is just wrong.
266 self._loop.call_soon(
267 self._step, None,
268 RuntimeError(
269 'yield was used instead of yield from for '
270 'generator in task {!r} with {}'.format(
271 self, result)))
272 else:
273 # Yielding something else is an error.
274 self._loop.call_soon(
275 self._step, None,
276 RuntimeError(
277 'Task got bad yield: {!r}'.format(result)))
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800278 finally:
279 self.__class__._current_tasks.pop(self._loop)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100280 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700281
282 def _wakeup(self, future):
283 try:
284 value = future.result()
285 except Exception as exc:
286 # This may also be a cancellation.
287 self._step(None, exc)
288 else:
289 self._step(value, None)
290 self = None # Needed to break cycles when an exception occurs.
291
292
293# wait() and as_completed() similar to those in PEP 3148.
294
295FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
296FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
297ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
298
299
300@coroutine
301def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
302 """Wait for the Futures and coroutines given by fs to complete.
303
Victor Stinnerdb74d982014-06-10 11:16:05 +0200304 The sequence futures must not be empty.
305
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700306 Coroutines will be wrapped in Tasks.
307
308 Returns two sets of Future: (done, pending).
309
310 Usage:
311
312 done, pending = yield from asyncio.wait(fs)
313
314 Note: This does not raise TimeoutError! Futures that aren't done
315 when the timeout occurs are returned in the second set.
316 """
Victor Stinnerf951d282014-06-29 00:46:45 +0200317 if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100318 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700319 if not fs:
320 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200321 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
322 raise ValueError('Invalid return_when value: {}'.format(return_when))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700323
324 if loop is None:
325 loop = events.get_event_loop()
326
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400327 fs = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700328
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700329 return (yield from _wait(fs, timeout, return_when, loop))
330
331
Victor Stinner59e08022014-08-28 11:19:25 +0200332def _release_waiter(waiter, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700333 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200334 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700335
336
337@coroutine
338def wait_for(fut, timeout, *, loop=None):
339 """Wait for the single Future or coroutine to complete, with timeout.
340
341 Coroutine will be wrapped in Task.
342
Victor Stinner421e49b2014-01-23 17:40:59 +0100343 Returns result of the Future or coroutine. When a timeout occurs,
344 it cancels the task and raises TimeoutError. To avoid the task
345 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700346
Victor Stinner922bc2c2015-01-15 16:29:10 +0100347 If the wait is cancelled, the task is also cancelled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700348
Victor Stinner922bc2c2015-01-15 16:29:10 +0100349 This function is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700350 """
351 if loop is None:
352 loop = events.get_event_loop()
353
Guido van Rossum48c66c32014-01-29 14:30:38 -0800354 if timeout is None:
355 return (yield from fut)
356
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700357 waiter = futures.Future(loop=loop)
Victor Stinner59e08022014-08-28 11:19:25 +0200358 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
359 cb = functools.partial(_release_waiter, waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700360
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400361 fut = ensure_future(fut, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700362 fut.add_done_callback(cb)
363
364 try:
Victor Stinner59e08022014-08-28 11:19:25 +0200365 # wait until the future completes or the timeout
Victor Stinner922bc2c2015-01-15 16:29:10 +0100366 try:
367 yield from waiter
368 except futures.CancelledError:
369 fut.remove_done_callback(cb)
370 fut.cancel()
371 raise
Victor Stinner59e08022014-08-28 11:19:25 +0200372
373 if fut.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700374 return fut.result()
375 else:
376 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100377 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700378 raise futures.TimeoutError()
379 finally:
380 timeout_handle.cancel()
381
382
383@coroutine
384def _wait(fs, timeout, return_when, loop):
385 """Internal helper for wait() and _wait_for().
386
387 The fs argument must be a collection of Futures.
388 """
389 assert fs, 'Set of Futures is empty.'
390 waiter = futures.Future(loop=loop)
391 timeout_handle = None
392 if timeout is not None:
393 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
394 counter = len(fs)
395
396 def _on_completion(f):
397 nonlocal counter
398 counter -= 1
399 if (counter <= 0 or
400 return_when == FIRST_COMPLETED or
401 return_when == FIRST_EXCEPTION and (not f.cancelled() and
402 f.exception() is not None)):
403 if timeout_handle is not None:
404 timeout_handle.cancel()
405 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200406 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700407
408 for f in fs:
409 f.add_done_callback(_on_completion)
410
411 try:
412 yield from waiter
413 finally:
414 if timeout_handle is not None:
415 timeout_handle.cancel()
416
417 done, pending = set(), set()
418 for f in fs:
419 f.remove_done_callback(_on_completion)
420 if f.done():
421 done.add(f)
422 else:
423 pending.add(f)
424 return done, pending
425
426
427# This is *not* a @coroutine! It is just an iterator (yielding Futures).
428def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800429 """Return an iterator whose values are coroutines.
430
431 When waiting for the yielded coroutines you'll get the results (or
432 exceptions!) of the original Futures (or coroutines), in the order
433 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700434
435 This differs from PEP 3148; the proper way to use this is:
436
437 for f in as_completed(fs):
438 result = yield from f # The 'yield from' may raise.
439 # Use result.
440
Guido van Rossumb58f0532014-02-12 17:58:19 -0800441 If a timeout is specified, the 'yield from' will raise
442 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700443
444 Note: The futures 'f' are not necessarily members of fs.
445 """
Victor Stinnerf951d282014-06-29 00:46:45 +0200446 if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100447 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700448 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400449 todo = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800450 from .queues import Queue # Import here to avoid circular import problem.
451 done = Queue(loop=loop)
452 timeout_handle = None
453
454 def _on_timeout():
455 for f in todo:
456 f.remove_done_callback(_on_completion)
457 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
458 todo.clear() # Can't do todo.remove(f) in the loop.
459
460 def _on_completion(f):
461 if not todo:
462 return # _on_timeout() was here first.
463 todo.remove(f)
464 done.put_nowait(f)
465 if not todo and timeout_handle is not None:
466 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700467
468 @coroutine
469 def _wait_for_one():
Guido van Rossumb58f0532014-02-12 17:58:19 -0800470 f = yield from done.get()
471 if f is None:
472 # Dummy value from _on_timeout().
473 raise futures.TimeoutError
474 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700475
Guido van Rossumb58f0532014-02-12 17:58:19 -0800476 for f in todo:
477 f.add_done_callback(_on_completion)
478 if todo and timeout is not None:
479 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700480 for _ in range(len(todo)):
481 yield _wait_for_one()
482
483
484@coroutine
485def sleep(delay, result=None, *, loop=None):
486 """Coroutine that completes after a given time (in seconds)."""
487 future = futures.Future(loop=loop)
Victor Stinnera9acbe82014-07-05 15:29:41 +0200488 h = future._loop.call_later(delay,
489 future._set_result_unless_cancelled, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700490 try:
491 return (yield from future)
492 finally:
493 h.cancel()
494
495
496def async(coro_or_future, *, loop=None):
497 """Wrap a coroutine in a future.
498
499 If the argument is a Future, it is returned directly.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400500
501 This function is deprecated in 3.5. Use asyncio.ensure_future() instead.
502 """
503
504 warnings.warn("asyncio.async() function is deprecated, use ensure_future()",
505 DeprecationWarning)
506
507 return ensure_future(coro_or_future, loop=loop)
508
509
510def ensure_future(coro_or_future, *, loop=None):
511 """Wrap a coroutine in a future.
512
513 If the argument is a Future, it is returned directly.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700514 """
515 if isinstance(coro_or_future, futures.Future):
516 if loop is not None and loop is not coro_or_future._loop:
517 raise ValueError('loop argument must agree with Future')
518 return coro_or_future
Victor Stinnerf951d282014-06-29 00:46:45 +0200519 elif coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200520 if loop is None:
521 loop = events.get_event_loop()
522 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200523 if task._source_traceback:
524 del task._source_traceback[-1]
525 return task
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700526 else:
527 raise TypeError('A Future or coroutine is required')
528
529
530class _GatheringFuture(futures.Future):
531 """Helper for gather().
532
533 This overrides cancel() to cancel all the children and act more
534 like Task.cancel(), which doesn't immediately mark itself as
535 cancelled.
536 """
537
538 def __init__(self, children, *, loop=None):
539 super().__init__(loop=loop)
540 self._children = children
541
542 def cancel(self):
543 if self.done():
544 return False
545 for child in self._children:
546 child.cancel()
547 return True
548
549
550def gather(*coros_or_futures, loop=None, return_exceptions=False):
551 """Return a future aggregating results from the given coroutines
552 or futures.
553
554 All futures must share the same event loop. If all the tasks are
555 done successfully, the returned future's result is the list of
556 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500557 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700558 exceptions in the tasks are treated the same as successful
559 results, and gathered in the result list; otherwise, the first
560 raised exception will be immediately propagated to the returned
561 future.
562
563 Cancellation: if the outer Future is cancelled, all children (that
564 have not completed yet) are also cancelled. If any child is
565 cancelled, this is treated as if it raised CancelledError --
566 the outer Future is *not* cancelled in this case. (This is to
567 prevent the cancellation of one child to cause other children to
568 be cancelled.)
569 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200570 if not coros_or_futures:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700571 outer = futures.Future(loop=loop)
572 outer.set_result([])
573 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200574
575 arg_to_fut = {}
576 for arg in set(coros_or_futures):
577 if not isinstance(arg, futures.Future):
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400578 fut = ensure_future(arg, loop=loop)
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200579 if loop is None:
580 loop = fut._loop
581 # The caller cannot control this future, the "destroy pending task"
582 # warning should not be emitted.
583 fut._log_destroy_pending = False
584 else:
585 fut = arg
586 if loop is None:
587 loop = fut._loop
588 elif fut._loop is not loop:
589 raise ValueError("futures are tied to different event loops")
590 arg_to_fut[arg] = fut
591
592 children = [arg_to_fut[arg] for arg in coros_or_futures]
593 nchildren = len(children)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700594 outer = _GatheringFuture(children, loop=loop)
595 nfinished = 0
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200596 results = [None] * nchildren
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700597
598 def _done_callback(i, fut):
599 nonlocal nfinished
Victor Stinner3531d902015-01-09 01:42:52 +0100600 if outer.done():
601 if not fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700602 # Mark exception retrieved.
603 fut.exception()
604 return
Victor Stinner3531d902015-01-09 01:42:52 +0100605
Victor Stinner29342622015-01-29 14:15:19 +0100606 if fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700607 res = futures.CancelledError()
608 if not return_exceptions:
609 outer.set_exception(res)
610 return
611 elif fut._exception is not None:
612 res = fut.exception() # Mark exception retrieved.
613 if not return_exceptions:
614 outer.set_exception(res)
615 return
616 else:
617 res = fut._result
618 results[i] = res
619 nfinished += 1
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200620 if nfinished == nchildren:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700621 outer.set_result(results)
622
623 for i, fut in enumerate(children):
624 fut.add_done_callback(functools.partial(_done_callback, i))
625 return outer
626
627
628def shield(arg, *, loop=None):
629 """Wait for a future, shielding it from cancellation.
630
631 The statement
632
633 res = yield from shield(something())
634
635 is exactly equivalent to the statement
636
637 res = yield from something()
638
639 *except* that if the coroutine containing it is cancelled, the
640 task running in something() is not cancelled. From the POV of
641 something(), the cancellation did not happen. But its caller is
642 still cancelled, so the yield-from expression still raises
643 CancelledError. Note: If something() is cancelled by other means
644 this will still cancel shield().
645
646 If you want to completely ignore cancellation (not recommended)
647 you can combine shield() with a try/except clause, as follows:
648
649 try:
650 res = yield from shield(something())
651 except CancelledError:
652 res = None
653 """
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400654 inner = ensure_future(arg, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700655 if inner.done():
656 # Shortcut.
657 return inner
658 loop = inner._loop
659 outer = futures.Future(loop=loop)
660
661 def _done_callback(inner):
662 if outer.cancelled():
Victor Stinner3531d902015-01-09 01:42:52 +0100663 if not inner.cancelled():
664 # Mark inner's result as retrieved.
665 inner.exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700666 return
Victor Stinner3531d902015-01-09 01:42:52 +0100667
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700668 if inner.cancelled():
669 outer.cancel()
670 else:
671 exc = inner.exception()
672 if exc is not None:
673 outer.set_exception(exc)
674 else:
675 outer.set_result(inner.result())
676
677 inner.add_done_callback(_done_callback)
678 return outer