blob: d8193ba48e1e1e6332bef9b1174a284c31eb5ed8 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Victor Stinnerf951d282014-06-29 00:46:45 +02003__all__ = ['Task',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
5 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
Yury Selivanov59eb9a42015-05-11 14:48:38 -04006 'gather', 'shield', 'ensure_future',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007 ]
8
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009import concurrent.futures
10import functools
11import inspect
12import linecache
Victor Stinner7ef60cd2014-02-19 23:15:02 +010013import sys
Yury Selivanov1af2bf72015-05-11 22:27:25 -040014import types
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015import traceback
Yury Selivanov59eb9a42015-05-11 14:48:38 -040016import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017import weakref
18
Victor Stinnerf951d282014-06-29 00:46:45 +020019from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020from . import events
21from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020022from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023
Victor Stinnera02f81f2014-06-24 22:37:53 +020024_PY34 = (sys.version_info >= (3, 4))
Victor Stinner8d3e02e2014-06-18 01:14:59 +020025
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027class Task(futures.Future):
28 """A coroutine wrapped in a Future."""
29
30 # An important invariant maintained while a Task not done:
31 #
32 # - Either _fut_waiter is None, and _step() is scheduled;
33 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
34 #
35 # The only transition from the latter to the former is through
36 # _wakeup(). When _fut_waiter is not None, one of its callbacks
37 # must be _wakeup().
38
39 # Weak set containing all tasks alive.
40 _all_tasks = weakref.WeakSet()
41
Guido van Rossum1a605ed2013-12-06 12:57:40 -080042 # Dictionary containing tasks that are currently active in
43 # all running event loops. {EventLoop: Task}
44 _current_tasks = {}
45
Victor Stinnerfe22e092014-12-04 23:00:13 +010046 # If False, don't log a message if the task is destroyed whereas its
47 # status is still pending
48 _log_destroy_pending = True
49
Guido van Rossum1a605ed2013-12-06 12:57:40 -080050 @classmethod
51 def current_task(cls, loop=None):
52 """Return the currently running task in an event loop or None.
53
54 By default the current task for the current event loop is returned.
55
56 None is returned when called not in the context of a Task.
57 """
58 if loop is None:
59 loop = events.get_event_loop()
60 return cls._current_tasks.get(loop)
61
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070062 @classmethod
63 def all_tasks(cls, loop=None):
64 """Return a set of all tasks for an event loop.
65
66 By default all tasks for the current event loop are returned.
67 """
68 if loop is None:
69 loop = events.get_event_loop()
70 return {t for t in cls._all_tasks if t._loop is loop}
71
72 def __init__(self, coro, *, loop=None):
Victor Stinner15cc6782015-01-09 00:09:10 +010073 assert coroutines.iscoroutine(coro), repr(coro)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +020075 if self._source_traceback:
76 del self._source_traceback[-1]
Yury Selivanov1ad08a52015-05-28 10:52:19 -040077 self._coro = coro
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070078 self._fut_waiter = None
79 self._must_cancel = False
80 self._loop.call_soon(self._step)
81 self.__class__._all_tasks.add(self)
82
R David Murray8e069d52014-09-24 13:13:45 -040083 # On Python 3.3 or older, objects with a destructor that are part of a
84 # reference cycle are never destroyed. That's not the case any more on
85 # Python 3.4 thanks to the PEP 442.
Victor Stinnera02f81f2014-06-24 22:37:53 +020086 if _PY34:
87 def __del__(self):
Victor Stinner98b63912014-06-30 14:51:04 +020088 if self._state == futures._PENDING and self._log_destroy_pending:
Victor Stinner80f53aa2014-06-27 13:52:20 +020089 context = {
Victor Stinnera02f81f2014-06-24 22:37:53 +020090 'task': self,
91 'message': 'Task was destroyed but it is pending!',
Victor Stinner80f53aa2014-06-27 13:52:20 +020092 }
93 if self._source_traceback:
94 context['source_traceback'] = self._source_traceback
95 self._loop.call_exception_handler(context)
Victor Stinnera02f81f2014-06-24 22:37:53 +020096 futures.Future.__del__(self)
97
Victor Stinner313a9802014-07-29 12:58:23 +020098 def _repr_info(self):
99 info = super()._repr_info()
100
Victor Stinner975735f2014-06-25 21:41:58 +0200101 if self._must_cancel:
Victor Stinner313a9802014-07-29 12:58:23 +0200102 # replace status
103 info[0] = 'cancelling'
Victor Stinner975735f2014-06-25 21:41:58 +0200104
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200105 coro = coroutines._format_coroutine(self._coro)
Victor Stinner313a9802014-07-29 12:58:23 +0200106 info.insert(1, 'coro=<%s>' % coro)
Victor Stinner975735f2014-06-25 21:41:58 +0200107
Victor Stinner2dba23a2014-07-03 00:59:00 +0200108 if self._fut_waiter is not None:
Victor Stinner313a9802014-07-29 12:58:23 +0200109 info.insert(2, 'wait_for=%r' % self._fut_waiter)
110 return info
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700111
112 def get_stack(self, *, limit=None):
113 """Return the list of stack frames for this task's coroutine.
114
Victor Stinnerd87de832014-12-02 17:57:04 +0100115 If the coroutine is not done, this returns the stack where it is
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700116 suspended. If the coroutine has completed successfully or was
117 cancelled, this returns an empty list. If the coroutine was
118 terminated by an exception, this returns the list of traceback
119 frames.
120
121 The frames are always ordered from oldest to newest.
122
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500123 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700124 return; by default all available frames are returned. Its
125 meaning differs depending on whether a stack or a traceback is
126 returned: the newest frames of a stack are returned, but the
127 oldest frames of a traceback are returned. (This matches the
128 behavior of the traceback module.)
129
130 For reasons beyond our control, only one stack frame is
131 returned for a suspended coroutine.
132 """
133 frames = []
134 f = self._coro.gi_frame
135 if f is not None:
136 while f is not None:
137 if limit is not None:
138 if limit <= 0:
139 break
140 limit -= 1
141 frames.append(f)
142 f = f.f_back
143 frames.reverse()
144 elif self._exception is not None:
145 tb = self._exception.__traceback__
146 while tb is not None:
147 if limit is not None:
148 if limit <= 0:
149 break
150 limit -= 1
151 frames.append(tb.tb_frame)
152 tb = tb.tb_next
153 return frames
154
155 def print_stack(self, *, limit=None, file=None):
156 """Print the stack or traceback for this task's coroutine.
157
158 This produces output similar to that of the traceback module,
159 for the frames retrieved by get_stack(). The limit argument
160 is passed to get_stack(). The file argument is an I/O stream
R David Murray8e069d52014-09-24 13:13:45 -0400161 to which the output is written; by default output is written
162 to sys.stderr.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700163 """
164 extracted_list = []
165 checked = set()
166 for f in self.get_stack(limit=limit):
167 lineno = f.f_lineno
168 co = f.f_code
169 filename = co.co_filename
170 name = co.co_name
171 if filename not in checked:
172 checked.add(filename)
173 linecache.checkcache(filename)
174 line = linecache.getline(filename, lineno, f.f_globals)
175 extracted_list.append((filename, lineno, name, line))
176 exc = self._exception
177 if not extracted_list:
178 print('No stack for %r' % self, file=file)
179 elif exc is not None:
180 print('Traceback for %r (most recent call last):' % self,
181 file=file)
182 else:
183 print('Stack for %r (most recent call last):' % self,
184 file=file)
185 traceback.print_list(extracted_list, file=file)
186 if exc is not None:
187 for line in traceback.format_exception_only(exc.__class__, exc):
188 print(line, file=file, end='')
189
190 def cancel(self):
R David Murray8e069d52014-09-24 13:13:45 -0400191 """Request that this task cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200192
Victor Stinner8d213572014-06-02 23:06:46 +0200193 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200194 wrapped coroutine on the next cycle through the event loop.
195 The coroutine then has a chance to clean up or even deny
196 the request using try/except/finally.
197
R David Murray8e069d52014-09-24 13:13:45 -0400198 Unlike Future.cancel, this does not guarantee that the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200199 task will be cancelled: the exception might be caught and
R David Murray8e069d52014-09-24 13:13:45 -0400200 acted upon, delaying cancellation of the task or preventing
201 cancellation completely. The task may also return a value or
202 raise a different exception.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200203
204 Immediately after this method is called, Task.cancelled() will
205 not return True (unless the task was already cancelled). A
206 task will be marked as cancelled when the wrapped coroutine
207 terminates with a CancelledError exception (even if cancel()
208 was not called).
209 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700210 if self.done():
211 return False
212 if self._fut_waiter is not None:
213 if self._fut_waiter.cancel():
214 # Leave self._fut_waiter; it may be a Task that
215 # catches and ignores the cancellation so we may have
216 # to cancel it again later.
217 return True
218 # It must be the case that self._step is already scheduled.
219 self._must_cancel = True
220 return True
221
222 def _step(self, value=None, exc=None):
223 assert not self.done(), \
224 '_step(): already done: {!r}, {!r}, {!r}'.format(self, value, exc)
225 if self._must_cancel:
226 if not isinstance(exc, futures.CancelledError):
227 exc = futures.CancelledError()
228 self._must_cancel = False
229 coro = self._coro
230 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800231
232 self.__class__._current_tasks[self._loop] = self
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700233 # Call either coro.throw(exc) or coro.send(value).
234 try:
235 if exc is not None:
236 result = coro.throw(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700237 else:
Yury Selivanov1ad08a52015-05-28 10:52:19 -0400238 result = coro.send(value)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700239 except StopIteration as exc:
240 self.set_result(exc.value)
241 except futures.CancelledError as exc:
242 super().cancel() # I.e., Future.cancel(self).
243 except Exception as exc:
244 self.set_exception(exc)
245 except BaseException as exc:
246 self.set_exception(exc)
247 raise
248 else:
249 if isinstance(result, futures.Future):
250 # Yielded Future must come from Future.__iter__().
251 if result._blocking:
252 result._blocking = False
253 result.add_done_callback(self._wakeup)
254 self._fut_waiter = result
255 if self._must_cancel:
256 if self._fut_waiter.cancel():
257 self._must_cancel = False
258 else:
259 self._loop.call_soon(
260 self._step, None,
261 RuntimeError(
262 'yield was used instead of yield from '
263 'in task {!r} with {!r}'.format(self, result)))
264 elif result is None:
265 # Bare yield relinquishes control for one event loop iteration.
266 self._loop.call_soon(self._step)
267 elif inspect.isgenerator(result):
268 # Yielding a generator is just wrong.
269 self._loop.call_soon(
270 self._step, None,
271 RuntimeError(
272 'yield was used instead of yield from for '
273 'generator in task {!r} with {}'.format(
274 self, result)))
275 else:
276 # Yielding something else is an error.
277 self._loop.call_soon(
278 self._step, None,
279 RuntimeError(
280 'Task got bad yield: {!r}'.format(result)))
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800281 finally:
282 self.__class__._current_tasks.pop(self._loop)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100283 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700284
285 def _wakeup(self, future):
286 try:
287 value = future.result()
288 except Exception as exc:
289 # This may also be a cancellation.
290 self._step(None, exc)
291 else:
292 self._step(value, None)
293 self = None # Needed to break cycles when an exception occurs.
294
295
296# wait() and as_completed() similar to those in PEP 3148.
297
298FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
299FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
300ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
301
302
303@coroutine
304def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
305 """Wait for the Futures and coroutines given by fs to complete.
306
Victor Stinnerdb74d982014-06-10 11:16:05 +0200307 The sequence futures must not be empty.
308
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700309 Coroutines will be wrapped in Tasks.
310
311 Returns two sets of Future: (done, pending).
312
313 Usage:
314
315 done, pending = yield from asyncio.wait(fs)
316
317 Note: This does not raise TimeoutError! Futures that aren't done
318 when the timeout occurs are returned in the second set.
319 """
Victor Stinnerf951d282014-06-29 00:46:45 +0200320 if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100321 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700322 if not fs:
323 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200324 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
325 raise ValueError('Invalid return_when value: {}'.format(return_when))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700326
327 if loop is None:
328 loop = events.get_event_loop()
329
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400330 fs = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700331
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700332 return (yield from _wait(fs, timeout, return_when, loop))
333
334
Victor Stinner59e08022014-08-28 11:19:25 +0200335def _release_waiter(waiter, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700336 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200337 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700338
339
340@coroutine
341def wait_for(fut, timeout, *, loop=None):
342 """Wait for the single Future or coroutine to complete, with timeout.
343
344 Coroutine will be wrapped in Task.
345
Victor Stinner421e49b2014-01-23 17:40:59 +0100346 Returns result of the Future or coroutine. When a timeout occurs,
347 it cancels the task and raises TimeoutError. To avoid the task
348 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700349
Victor Stinner922bc2c2015-01-15 16:29:10 +0100350 If the wait is cancelled, the task is also cancelled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700351
Victor Stinner922bc2c2015-01-15 16:29:10 +0100352 This function is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700353 """
354 if loop is None:
355 loop = events.get_event_loop()
356
Guido van Rossum48c66c32014-01-29 14:30:38 -0800357 if timeout is None:
358 return (yield from fut)
359
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700360 waiter = futures.Future(loop=loop)
Victor Stinner59e08022014-08-28 11:19:25 +0200361 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
362 cb = functools.partial(_release_waiter, waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700363
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400364 fut = ensure_future(fut, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700365 fut.add_done_callback(cb)
366
367 try:
Victor Stinner59e08022014-08-28 11:19:25 +0200368 # wait until the future completes or the timeout
Victor Stinner922bc2c2015-01-15 16:29:10 +0100369 try:
370 yield from waiter
371 except futures.CancelledError:
372 fut.remove_done_callback(cb)
373 fut.cancel()
374 raise
Victor Stinner59e08022014-08-28 11:19:25 +0200375
376 if fut.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700377 return fut.result()
378 else:
379 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100380 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700381 raise futures.TimeoutError()
382 finally:
383 timeout_handle.cancel()
384
385
386@coroutine
387def _wait(fs, timeout, return_when, loop):
388 """Internal helper for wait() and _wait_for().
389
390 The fs argument must be a collection of Futures.
391 """
392 assert fs, 'Set of Futures is empty.'
393 waiter = futures.Future(loop=loop)
394 timeout_handle = None
395 if timeout is not None:
396 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
397 counter = len(fs)
398
399 def _on_completion(f):
400 nonlocal counter
401 counter -= 1
402 if (counter <= 0 or
403 return_when == FIRST_COMPLETED or
404 return_when == FIRST_EXCEPTION and (not f.cancelled() and
405 f.exception() is not None)):
406 if timeout_handle is not None:
407 timeout_handle.cancel()
408 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200409 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700410
411 for f in fs:
412 f.add_done_callback(_on_completion)
413
414 try:
415 yield from waiter
416 finally:
417 if timeout_handle is not None:
418 timeout_handle.cancel()
419
420 done, pending = set(), set()
421 for f in fs:
422 f.remove_done_callback(_on_completion)
423 if f.done():
424 done.add(f)
425 else:
426 pending.add(f)
427 return done, pending
428
429
430# This is *not* a @coroutine! It is just an iterator (yielding Futures).
431def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800432 """Return an iterator whose values are coroutines.
433
434 When waiting for the yielded coroutines you'll get the results (or
435 exceptions!) of the original Futures (or coroutines), in the order
436 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700437
438 This differs from PEP 3148; the proper way to use this is:
439
440 for f in as_completed(fs):
441 result = yield from f # The 'yield from' may raise.
442 # Use result.
443
Guido van Rossumb58f0532014-02-12 17:58:19 -0800444 If a timeout is specified, the 'yield from' will raise
445 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700446
447 Note: The futures 'f' are not necessarily members of fs.
448 """
Victor Stinnerf951d282014-06-29 00:46:45 +0200449 if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100450 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700451 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400452 todo = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800453 from .queues import Queue # Import here to avoid circular import problem.
454 done = Queue(loop=loop)
455 timeout_handle = None
456
457 def _on_timeout():
458 for f in todo:
459 f.remove_done_callback(_on_completion)
460 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
461 todo.clear() # Can't do todo.remove(f) in the loop.
462
463 def _on_completion(f):
464 if not todo:
465 return # _on_timeout() was here first.
466 todo.remove(f)
467 done.put_nowait(f)
468 if not todo and timeout_handle is not None:
469 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700470
471 @coroutine
472 def _wait_for_one():
Guido van Rossumb58f0532014-02-12 17:58:19 -0800473 f = yield from done.get()
474 if f is None:
475 # Dummy value from _on_timeout().
476 raise futures.TimeoutError
477 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700478
Guido van Rossumb58f0532014-02-12 17:58:19 -0800479 for f in todo:
480 f.add_done_callback(_on_completion)
481 if todo and timeout is not None:
482 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700483 for _ in range(len(todo)):
484 yield _wait_for_one()
485
486
487@coroutine
488def sleep(delay, result=None, *, loop=None):
489 """Coroutine that completes after a given time (in seconds)."""
490 future = futures.Future(loop=loop)
Victor Stinnera9acbe82014-07-05 15:29:41 +0200491 h = future._loop.call_later(delay,
492 future._set_result_unless_cancelled, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700493 try:
494 return (yield from future)
495 finally:
496 h.cancel()
497
498
499def async(coro_or_future, *, loop=None):
500 """Wrap a coroutine in a future.
501
502 If the argument is a Future, it is returned directly.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400503
504 This function is deprecated in 3.5. Use asyncio.ensure_future() instead.
505 """
506
507 warnings.warn("asyncio.async() function is deprecated, use ensure_future()",
508 DeprecationWarning)
509
510 return ensure_future(coro_or_future, loop=loop)
511
512
513def ensure_future(coro_or_future, *, loop=None):
514 """Wrap a coroutine in a future.
515
516 If the argument is a Future, it is returned directly.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700517 """
518 if isinstance(coro_or_future, futures.Future):
519 if loop is not None and loop is not coro_or_future._loop:
520 raise ValueError('loop argument must agree with Future')
521 return coro_or_future
Victor Stinnerf951d282014-06-29 00:46:45 +0200522 elif coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200523 if loop is None:
524 loop = events.get_event_loop()
525 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200526 if task._source_traceback:
527 del task._source_traceback[-1]
528 return task
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700529 else:
530 raise TypeError('A Future or coroutine is required')
531
532
533class _GatheringFuture(futures.Future):
534 """Helper for gather().
535
536 This overrides cancel() to cancel all the children and act more
537 like Task.cancel(), which doesn't immediately mark itself as
538 cancelled.
539 """
540
541 def __init__(self, children, *, loop=None):
542 super().__init__(loop=loop)
543 self._children = children
544
545 def cancel(self):
546 if self.done():
547 return False
548 for child in self._children:
549 child.cancel()
550 return True
551
552
553def gather(*coros_or_futures, loop=None, return_exceptions=False):
554 """Return a future aggregating results from the given coroutines
555 or futures.
556
557 All futures must share the same event loop. If all the tasks are
558 done successfully, the returned future's result is the list of
559 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500560 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700561 exceptions in the tasks are treated the same as successful
562 results, and gathered in the result list; otherwise, the first
563 raised exception will be immediately propagated to the returned
564 future.
565
566 Cancellation: if the outer Future is cancelled, all children (that
567 have not completed yet) are also cancelled. If any child is
568 cancelled, this is treated as if it raised CancelledError --
569 the outer Future is *not* cancelled in this case. (This is to
570 prevent the cancellation of one child to cause other children to
571 be cancelled.)
572 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200573 if not coros_or_futures:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700574 outer = futures.Future(loop=loop)
575 outer.set_result([])
576 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200577
578 arg_to_fut = {}
579 for arg in set(coros_or_futures):
580 if not isinstance(arg, futures.Future):
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400581 fut = ensure_future(arg, loop=loop)
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200582 if loop is None:
583 loop = fut._loop
584 # The caller cannot control this future, the "destroy pending task"
585 # warning should not be emitted.
586 fut._log_destroy_pending = False
587 else:
588 fut = arg
589 if loop is None:
590 loop = fut._loop
591 elif fut._loop is not loop:
592 raise ValueError("futures are tied to different event loops")
593 arg_to_fut[arg] = fut
594
595 children = [arg_to_fut[arg] for arg in coros_or_futures]
596 nchildren = len(children)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700597 outer = _GatheringFuture(children, loop=loop)
598 nfinished = 0
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200599 results = [None] * nchildren
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700600
601 def _done_callback(i, fut):
602 nonlocal nfinished
Victor Stinner3531d902015-01-09 01:42:52 +0100603 if outer.done():
604 if not fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700605 # Mark exception retrieved.
606 fut.exception()
607 return
Victor Stinner3531d902015-01-09 01:42:52 +0100608
Victor Stinner29342622015-01-29 14:15:19 +0100609 if fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700610 res = futures.CancelledError()
611 if not return_exceptions:
612 outer.set_exception(res)
613 return
614 elif fut._exception is not None:
615 res = fut.exception() # Mark exception retrieved.
616 if not return_exceptions:
617 outer.set_exception(res)
618 return
619 else:
620 res = fut._result
621 results[i] = res
622 nfinished += 1
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200623 if nfinished == nchildren:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700624 outer.set_result(results)
625
626 for i, fut in enumerate(children):
627 fut.add_done_callback(functools.partial(_done_callback, i))
628 return outer
629
630
631def shield(arg, *, loop=None):
632 """Wait for a future, shielding it from cancellation.
633
634 The statement
635
636 res = yield from shield(something())
637
638 is exactly equivalent to the statement
639
640 res = yield from something()
641
642 *except* that if the coroutine containing it is cancelled, the
643 task running in something() is not cancelled. From the POV of
644 something(), the cancellation did not happen. But its caller is
645 still cancelled, so the yield-from expression still raises
646 CancelledError. Note: If something() is cancelled by other means
647 this will still cancel shield().
648
649 If you want to completely ignore cancellation (not recommended)
650 you can combine shield() with a try/except clause, as follows:
651
652 try:
653 res = yield from shield(something())
654 except CancelledError:
655 res = None
656 """
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400657 inner = ensure_future(arg, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700658 if inner.done():
659 # Shortcut.
660 return inner
661 loop = inner._loop
662 outer = futures.Future(loop=loop)
663
664 def _done_callback(inner):
665 if outer.cancelled():
Victor Stinner3531d902015-01-09 01:42:52 +0100666 if not inner.cancelled():
667 # Mark inner's result as retrieved.
668 inner.exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700669 return
Victor Stinner3531d902015-01-09 01:42:52 +0100670
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700671 if inner.cancelled():
672 outer.cancel()
673 else:
674 exc = inner.exception()
675 if exc is not None:
676 outer.set_exception(exc)
677 else:
678 outer.set_result(inner.result())
679
680 inner.add_done_callback(_done_callback)
681 return outer