blob: f617b62be2c0717c71ff65c263e2e5ec65653d66 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Victor Stinnerf951d282014-06-29 00:46:45 +02003__all__ = ['Task',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
5 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
Yury Selivanov59eb9a42015-05-11 14:48:38 -04006 'gather', 'shield', 'ensure_future',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007 ]
8
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009import concurrent.futures
10import functools
11import inspect
12import linecache
Victor Stinner7ef60cd2014-02-19 23:15:02 +010013import sys
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014import traceback
Yury Selivanov59eb9a42015-05-11 14:48:38 -040015import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016import weakref
17
Victor Stinnerf951d282014-06-29 00:46:45 +020018from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019from . import events
20from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020021from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
Victor Stinnera02f81f2014-06-24 22:37:53 +020023_PY34 = (sys.version_info >= (3, 4))
Victor Stinner8d3e02e2014-06-18 01:14:59 +020024
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026class Task(futures.Future):
27 """A coroutine wrapped in a Future."""
28
29 # An important invariant maintained while a Task not done:
30 #
31 # - Either _fut_waiter is None, and _step() is scheduled;
32 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
33 #
34 # The only transition from the latter to the former is through
35 # _wakeup(). When _fut_waiter is not None, one of its callbacks
36 # must be _wakeup().
37
38 # Weak set containing all tasks alive.
39 _all_tasks = weakref.WeakSet()
40
Guido van Rossum1a605ed2013-12-06 12:57:40 -080041 # Dictionary containing tasks that are currently active in
42 # all running event loops. {EventLoop: Task}
43 _current_tasks = {}
44
Victor Stinnerfe22e092014-12-04 23:00:13 +010045 # If False, don't log a message if the task is destroyed whereas its
46 # status is still pending
47 _log_destroy_pending = True
48
Guido van Rossum1a605ed2013-12-06 12:57:40 -080049 @classmethod
50 def current_task(cls, loop=None):
51 """Return the currently running task in an event loop or None.
52
53 By default the current task for the current event loop is returned.
54
55 None is returned when called not in the context of a Task.
56 """
57 if loop is None:
58 loop = events.get_event_loop()
59 return cls._current_tasks.get(loop)
60
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070061 @classmethod
62 def all_tasks(cls, loop=None):
63 """Return a set of all tasks for an event loop.
64
65 By default all tasks for the current event loop are returned.
66 """
67 if loop is None:
68 loop = events.get_event_loop()
69 return {t for t in cls._all_tasks if t._loop is loop}
70
71 def __init__(self, coro, *, loop=None):
Victor Stinner15cc6782015-01-09 00:09:10 +010072 assert coroutines.iscoroutine(coro), repr(coro)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070073 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +020074 if self._source_traceback:
75 del self._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070076 self._coro = iter(coro) # Use the iterator just in case.
77 self._fut_waiter = None
78 self._must_cancel = False
79 self._loop.call_soon(self._step)
80 self.__class__._all_tasks.add(self)
81
R David Murray8e069d52014-09-24 13:13:45 -040082 # On Python 3.3 or older, objects with a destructor that are part of a
83 # reference cycle are never destroyed. That's not the case any more on
84 # Python 3.4 thanks to the PEP 442.
Victor Stinnera02f81f2014-06-24 22:37:53 +020085 if _PY34:
86 def __del__(self):
Victor Stinner98b63912014-06-30 14:51:04 +020087 if self._state == futures._PENDING and self._log_destroy_pending:
Victor Stinner80f53aa2014-06-27 13:52:20 +020088 context = {
Victor Stinnera02f81f2014-06-24 22:37:53 +020089 'task': self,
90 'message': 'Task was destroyed but it is pending!',
Victor Stinner80f53aa2014-06-27 13:52:20 +020091 }
92 if self._source_traceback:
93 context['source_traceback'] = self._source_traceback
94 self._loop.call_exception_handler(context)
Victor Stinnera02f81f2014-06-24 22:37:53 +020095 futures.Future.__del__(self)
96
Victor Stinner313a9802014-07-29 12:58:23 +020097 def _repr_info(self):
98 info = super()._repr_info()
99
Victor Stinner975735f2014-06-25 21:41:58 +0200100 if self._must_cancel:
Victor Stinner313a9802014-07-29 12:58:23 +0200101 # replace status
102 info[0] = 'cancelling'
Victor Stinner975735f2014-06-25 21:41:58 +0200103
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200104 coro = coroutines._format_coroutine(self._coro)
Victor Stinner313a9802014-07-29 12:58:23 +0200105 info.insert(1, 'coro=<%s>' % coro)
Victor Stinner975735f2014-06-25 21:41:58 +0200106
Victor Stinner2dba23a2014-07-03 00:59:00 +0200107 if self._fut_waiter is not None:
Victor Stinner313a9802014-07-29 12:58:23 +0200108 info.insert(2, 'wait_for=%r' % self._fut_waiter)
109 return info
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700110
111 def get_stack(self, *, limit=None):
112 """Return the list of stack frames for this task's coroutine.
113
Victor Stinnerd87de832014-12-02 17:57:04 +0100114 If the coroutine is not done, this returns the stack where it is
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700115 suspended. If the coroutine has completed successfully or was
116 cancelled, this returns an empty list. If the coroutine was
117 terminated by an exception, this returns the list of traceback
118 frames.
119
120 The frames are always ordered from oldest to newest.
121
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500122 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700123 return; by default all available frames are returned. Its
124 meaning differs depending on whether a stack or a traceback is
125 returned: the newest frames of a stack are returned, but the
126 oldest frames of a traceback are returned. (This matches the
127 behavior of the traceback module.)
128
129 For reasons beyond our control, only one stack frame is
130 returned for a suspended coroutine.
131 """
132 frames = []
133 f = self._coro.gi_frame
134 if f is not None:
135 while f is not None:
136 if limit is not None:
137 if limit <= 0:
138 break
139 limit -= 1
140 frames.append(f)
141 f = f.f_back
142 frames.reverse()
143 elif self._exception is not None:
144 tb = self._exception.__traceback__
145 while tb is not None:
146 if limit is not None:
147 if limit <= 0:
148 break
149 limit -= 1
150 frames.append(tb.tb_frame)
151 tb = tb.tb_next
152 return frames
153
154 def print_stack(self, *, limit=None, file=None):
155 """Print the stack or traceback for this task's coroutine.
156
157 This produces output similar to that of the traceback module,
158 for the frames retrieved by get_stack(). The limit argument
159 is passed to get_stack(). The file argument is an I/O stream
R David Murray8e069d52014-09-24 13:13:45 -0400160 to which the output is written; by default output is written
161 to sys.stderr.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700162 """
163 extracted_list = []
164 checked = set()
165 for f in self.get_stack(limit=limit):
166 lineno = f.f_lineno
167 co = f.f_code
168 filename = co.co_filename
169 name = co.co_name
170 if filename not in checked:
171 checked.add(filename)
172 linecache.checkcache(filename)
173 line = linecache.getline(filename, lineno, f.f_globals)
174 extracted_list.append((filename, lineno, name, line))
175 exc = self._exception
176 if not extracted_list:
177 print('No stack for %r' % self, file=file)
178 elif exc is not None:
179 print('Traceback for %r (most recent call last):' % self,
180 file=file)
181 else:
182 print('Stack for %r (most recent call last):' % self,
183 file=file)
184 traceback.print_list(extracted_list, file=file)
185 if exc is not None:
186 for line in traceback.format_exception_only(exc.__class__, exc):
187 print(line, file=file, end='')
188
189 def cancel(self):
R David Murray8e069d52014-09-24 13:13:45 -0400190 """Request that this task cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200191
Victor Stinner8d213572014-06-02 23:06:46 +0200192 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200193 wrapped coroutine on the next cycle through the event loop.
194 The coroutine then has a chance to clean up or even deny
195 the request using try/except/finally.
196
R David Murray8e069d52014-09-24 13:13:45 -0400197 Unlike Future.cancel, this does not guarantee that the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200198 task will be cancelled: the exception might be caught and
R David Murray8e069d52014-09-24 13:13:45 -0400199 acted upon, delaying cancellation of the task or preventing
200 cancellation completely. The task may also return a value or
201 raise a different exception.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200202
203 Immediately after this method is called, Task.cancelled() will
204 not return True (unless the task was already cancelled). A
205 task will be marked as cancelled when the wrapped coroutine
206 terminates with a CancelledError exception (even if cancel()
207 was not called).
208 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700209 if self.done():
210 return False
211 if self._fut_waiter is not None:
212 if self._fut_waiter.cancel():
213 # Leave self._fut_waiter; it may be a Task that
214 # catches and ignores the cancellation so we may have
215 # to cancel it again later.
216 return True
217 # It must be the case that self._step is already scheduled.
218 self._must_cancel = True
219 return True
220
221 def _step(self, value=None, exc=None):
222 assert not self.done(), \
223 '_step(): already done: {!r}, {!r}, {!r}'.format(self, value, exc)
224 if self._must_cancel:
225 if not isinstance(exc, futures.CancelledError):
226 exc = futures.CancelledError()
227 self._must_cancel = False
228 coro = self._coro
229 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800230
231 self.__class__._current_tasks[self._loop] = self
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700232 # Call either coro.throw(exc) or coro.send(value).
233 try:
234 if exc is not None:
235 result = coro.throw(exc)
236 elif value is not None:
237 result = coro.send(value)
238 else:
239 result = next(coro)
240 except StopIteration as exc:
241 self.set_result(exc.value)
242 except futures.CancelledError as exc:
243 super().cancel() # I.e., Future.cancel(self).
244 except Exception as exc:
245 self.set_exception(exc)
246 except BaseException as exc:
247 self.set_exception(exc)
248 raise
249 else:
250 if isinstance(result, futures.Future):
251 # Yielded Future must come from Future.__iter__().
252 if result._blocking:
253 result._blocking = False
254 result.add_done_callback(self._wakeup)
255 self._fut_waiter = result
256 if self._must_cancel:
257 if self._fut_waiter.cancel():
258 self._must_cancel = False
259 else:
260 self._loop.call_soon(
261 self._step, None,
262 RuntimeError(
263 'yield was used instead of yield from '
264 'in task {!r} with {!r}'.format(self, result)))
265 elif result is None:
266 # Bare yield relinquishes control for one event loop iteration.
267 self._loop.call_soon(self._step)
268 elif inspect.isgenerator(result):
269 # Yielding a generator is just wrong.
270 self._loop.call_soon(
271 self._step, None,
272 RuntimeError(
273 'yield was used instead of yield from for '
274 'generator in task {!r} with {}'.format(
275 self, result)))
276 else:
277 # Yielding something else is an error.
278 self._loop.call_soon(
279 self._step, None,
280 RuntimeError(
281 'Task got bad yield: {!r}'.format(result)))
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800282 finally:
283 self.__class__._current_tasks.pop(self._loop)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100284 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700285
286 def _wakeup(self, future):
287 try:
288 value = future.result()
289 except Exception as exc:
290 # This may also be a cancellation.
291 self._step(None, exc)
292 else:
293 self._step(value, None)
294 self = None # Needed to break cycles when an exception occurs.
295
296
297# wait() and as_completed() similar to those in PEP 3148.
298
299FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
300FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
301ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
302
303
304@coroutine
305def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
306 """Wait for the Futures and coroutines given by fs to complete.
307
Victor Stinnerdb74d982014-06-10 11:16:05 +0200308 The sequence futures must not be empty.
309
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700310 Coroutines will be wrapped in Tasks.
311
312 Returns two sets of Future: (done, pending).
313
314 Usage:
315
316 done, pending = yield from asyncio.wait(fs)
317
318 Note: This does not raise TimeoutError! Futures that aren't done
319 when the timeout occurs are returned in the second set.
320 """
Victor Stinnerf951d282014-06-29 00:46:45 +0200321 if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100322 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700323 if not fs:
324 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200325 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
326 raise ValueError('Invalid return_when value: {}'.format(return_when))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700327
328 if loop is None:
329 loop = events.get_event_loop()
330
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400331 fs = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700332
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700333 return (yield from _wait(fs, timeout, return_when, loop))
334
335
Victor Stinner59e08022014-08-28 11:19:25 +0200336def _release_waiter(waiter, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700337 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200338 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700339
340
341@coroutine
342def wait_for(fut, timeout, *, loop=None):
343 """Wait for the single Future or coroutine to complete, with timeout.
344
345 Coroutine will be wrapped in Task.
346
Victor Stinner421e49b2014-01-23 17:40:59 +0100347 Returns result of the Future or coroutine. When a timeout occurs,
348 it cancels the task and raises TimeoutError. To avoid the task
349 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700350
Victor Stinner922bc2c2015-01-15 16:29:10 +0100351 If the wait is cancelled, the task is also cancelled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700352
Victor Stinner922bc2c2015-01-15 16:29:10 +0100353 This function is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700354 """
355 if loop is None:
356 loop = events.get_event_loop()
357
Guido van Rossum48c66c32014-01-29 14:30:38 -0800358 if timeout is None:
359 return (yield from fut)
360
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700361 waiter = futures.Future(loop=loop)
Victor Stinner59e08022014-08-28 11:19:25 +0200362 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
363 cb = functools.partial(_release_waiter, waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700364
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400365 fut = ensure_future(fut, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700366 fut.add_done_callback(cb)
367
368 try:
Victor Stinner59e08022014-08-28 11:19:25 +0200369 # wait until the future completes or the timeout
Victor Stinner922bc2c2015-01-15 16:29:10 +0100370 try:
371 yield from waiter
372 except futures.CancelledError:
373 fut.remove_done_callback(cb)
374 fut.cancel()
375 raise
Victor Stinner59e08022014-08-28 11:19:25 +0200376
377 if fut.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700378 return fut.result()
379 else:
380 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100381 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700382 raise futures.TimeoutError()
383 finally:
384 timeout_handle.cancel()
385
386
387@coroutine
388def _wait(fs, timeout, return_when, loop):
389 """Internal helper for wait() and _wait_for().
390
391 The fs argument must be a collection of Futures.
392 """
393 assert fs, 'Set of Futures is empty.'
394 waiter = futures.Future(loop=loop)
395 timeout_handle = None
396 if timeout is not None:
397 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
398 counter = len(fs)
399
400 def _on_completion(f):
401 nonlocal counter
402 counter -= 1
403 if (counter <= 0 or
404 return_when == FIRST_COMPLETED or
405 return_when == FIRST_EXCEPTION and (not f.cancelled() and
406 f.exception() is not None)):
407 if timeout_handle is not None:
408 timeout_handle.cancel()
409 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200410 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700411
412 for f in fs:
413 f.add_done_callback(_on_completion)
414
415 try:
416 yield from waiter
417 finally:
418 if timeout_handle is not None:
419 timeout_handle.cancel()
420
421 done, pending = set(), set()
422 for f in fs:
423 f.remove_done_callback(_on_completion)
424 if f.done():
425 done.add(f)
426 else:
427 pending.add(f)
428 return done, pending
429
430
431# This is *not* a @coroutine! It is just an iterator (yielding Futures).
432def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800433 """Return an iterator whose values are coroutines.
434
435 When waiting for the yielded coroutines you'll get the results (or
436 exceptions!) of the original Futures (or coroutines), in the order
437 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700438
439 This differs from PEP 3148; the proper way to use this is:
440
441 for f in as_completed(fs):
442 result = yield from f # The 'yield from' may raise.
443 # Use result.
444
Guido van Rossumb58f0532014-02-12 17:58:19 -0800445 If a timeout is specified, the 'yield from' will raise
446 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700447
448 Note: The futures 'f' are not necessarily members of fs.
449 """
Victor Stinnerf951d282014-06-29 00:46:45 +0200450 if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100451 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700452 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400453 todo = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800454 from .queues import Queue # Import here to avoid circular import problem.
455 done = Queue(loop=loop)
456 timeout_handle = None
457
458 def _on_timeout():
459 for f in todo:
460 f.remove_done_callback(_on_completion)
461 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
462 todo.clear() # Can't do todo.remove(f) in the loop.
463
464 def _on_completion(f):
465 if not todo:
466 return # _on_timeout() was here first.
467 todo.remove(f)
468 done.put_nowait(f)
469 if not todo and timeout_handle is not None:
470 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700471
472 @coroutine
473 def _wait_for_one():
Guido van Rossumb58f0532014-02-12 17:58:19 -0800474 f = yield from done.get()
475 if f is None:
476 # Dummy value from _on_timeout().
477 raise futures.TimeoutError
478 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700479
Guido van Rossumb58f0532014-02-12 17:58:19 -0800480 for f in todo:
481 f.add_done_callback(_on_completion)
482 if todo and timeout is not None:
483 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700484 for _ in range(len(todo)):
485 yield _wait_for_one()
486
487
488@coroutine
489def sleep(delay, result=None, *, loop=None):
490 """Coroutine that completes after a given time (in seconds)."""
491 future = futures.Future(loop=loop)
Victor Stinnera9acbe82014-07-05 15:29:41 +0200492 h = future._loop.call_later(delay,
493 future._set_result_unless_cancelled, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700494 try:
495 return (yield from future)
496 finally:
497 h.cancel()
498
499
500def async(coro_or_future, *, loop=None):
501 """Wrap a coroutine in a future.
502
503 If the argument is a Future, it is returned directly.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400504
505 This function is deprecated in 3.5. Use asyncio.ensure_future() instead.
506 """
507
508 warnings.warn("asyncio.async() function is deprecated, use ensure_future()",
509 DeprecationWarning)
510
511 return ensure_future(coro_or_future, loop=loop)
512
513
514def ensure_future(coro_or_future, *, loop=None):
515 """Wrap a coroutine in a future.
516
517 If the argument is a Future, it is returned directly.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700518 """
519 if isinstance(coro_or_future, futures.Future):
520 if loop is not None and loop is not coro_or_future._loop:
521 raise ValueError('loop argument must agree with Future')
522 return coro_or_future
Victor Stinnerf951d282014-06-29 00:46:45 +0200523 elif coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200524 if loop is None:
525 loop = events.get_event_loop()
526 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200527 if task._source_traceback:
528 del task._source_traceback[-1]
529 return task
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700530 else:
531 raise TypeError('A Future or coroutine is required')
532
533
534class _GatheringFuture(futures.Future):
535 """Helper for gather().
536
537 This overrides cancel() to cancel all the children and act more
538 like Task.cancel(), which doesn't immediately mark itself as
539 cancelled.
540 """
541
542 def __init__(self, children, *, loop=None):
543 super().__init__(loop=loop)
544 self._children = children
545
546 def cancel(self):
547 if self.done():
548 return False
549 for child in self._children:
550 child.cancel()
551 return True
552
553
554def gather(*coros_or_futures, loop=None, return_exceptions=False):
555 """Return a future aggregating results from the given coroutines
556 or futures.
557
558 All futures must share the same event loop. If all the tasks are
559 done successfully, the returned future's result is the list of
560 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500561 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700562 exceptions in the tasks are treated the same as successful
563 results, and gathered in the result list; otherwise, the first
564 raised exception will be immediately propagated to the returned
565 future.
566
567 Cancellation: if the outer Future is cancelled, all children (that
568 have not completed yet) are also cancelled. If any child is
569 cancelled, this is treated as if it raised CancelledError --
570 the outer Future is *not* cancelled in this case. (This is to
571 prevent the cancellation of one child to cause other children to
572 be cancelled.)
573 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200574 if not coros_or_futures:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700575 outer = futures.Future(loop=loop)
576 outer.set_result([])
577 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200578
579 arg_to_fut = {}
580 for arg in set(coros_or_futures):
581 if not isinstance(arg, futures.Future):
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400582 fut = ensure_future(arg, loop=loop)
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200583 if loop is None:
584 loop = fut._loop
585 # The caller cannot control this future, the "destroy pending task"
586 # warning should not be emitted.
587 fut._log_destroy_pending = False
588 else:
589 fut = arg
590 if loop is None:
591 loop = fut._loop
592 elif fut._loop is not loop:
593 raise ValueError("futures are tied to different event loops")
594 arg_to_fut[arg] = fut
595
596 children = [arg_to_fut[arg] for arg in coros_or_futures]
597 nchildren = len(children)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700598 outer = _GatheringFuture(children, loop=loop)
599 nfinished = 0
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200600 results = [None] * nchildren
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700601
602 def _done_callback(i, fut):
603 nonlocal nfinished
Victor Stinner3531d902015-01-09 01:42:52 +0100604 if outer.done():
605 if not fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700606 # Mark exception retrieved.
607 fut.exception()
608 return
Victor Stinner3531d902015-01-09 01:42:52 +0100609
Victor Stinner29342622015-01-29 14:15:19 +0100610 if fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700611 res = futures.CancelledError()
612 if not return_exceptions:
613 outer.set_exception(res)
614 return
615 elif fut._exception is not None:
616 res = fut.exception() # Mark exception retrieved.
617 if not return_exceptions:
618 outer.set_exception(res)
619 return
620 else:
621 res = fut._result
622 results[i] = res
623 nfinished += 1
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200624 if nfinished == nchildren:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700625 outer.set_result(results)
626
627 for i, fut in enumerate(children):
628 fut.add_done_callback(functools.partial(_done_callback, i))
629 return outer
630
631
632def shield(arg, *, loop=None):
633 """Wait for a future, shielding it from cancellation.
634
635 The statement
636
637 res = yield from shield(something())
638
639 is exactly equivalent to the statement
640
641 res = yield from something()
642
643 *except* that if the coroutine containing it is cancelled, the
644 task running in something() is not cancelled. From the POV of
645 something(), the cancellation did not happen. But its caller is
646 still cancelled, so the yield-from expression still raises
647 CancelledError. Note: If something() is cancelled by other means
648 this will still cancel shield().
649
650 If you want to completely ignore cancellation (not recommended)
651 you can combine shield() with a try/except clause, as follows:
652
653 try:
654 res = yield from shield(something())
655 except CancelledError:
656 res = None
657 """
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400658 inner = ensure_future(arg, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700659 if inner.done():
660 # Shortcut.
661 return inner
662 loop = inner._loop
663 outer = futures.Future(loop=loop)
664
665 def _done_callback(inner):
666 if outer.cancelled():
Victor Stinner3531d902015-01-09 01:42:52 +0100667 if not inner.cancelled():
668 # Mark inner's result as retrieved.
669 inner.exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700670 return
Victor Stinner3531d902015-01-09 01:42:52 +0100671
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700672 if inner.cancelled():
673 outer.cancel()
674 else:
675 exc = inner.exception()
676 if exc is not None:
677 outer.set_exception(exc)
678 else:
679 outer.set_result(inner.result())
680
681 inner.add_done_callback(_done_callback)
682 return outer