blob: c556e448c7cf3c4a143937cf4dc289de447ebd14 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Victor Stinnerf951d282014-06-29 00:46:45 +02003__all__ = ['Task',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
5 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
Guido van Rossumde3a1362013-11-29 09:29:00 -08006 'gather', 'shield',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007 ]
8
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009import concurrent.futures
10import functools
11import inspect
12import linecache
Victor Stinner7ef60cd2014-02-19 23:15:02 +010013import sys
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014import traceback
15import weakref
16
Victor Stinnerf951d282014-06-29 00:46:45 +020017from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import events
19from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020020from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021
Victor Stinnera02f81f2014-06-24 22:37:53 +020022_PY34 = (sys.version_info >= (3, 4))
Victor Stinner8d3e02e2014-06-18 01:14:59 +020023
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070024
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025class Task(futures.Future):
26 """A coroutine wrapped in a Future."""
27
28 # An important invariant maintained while a Task not done:
29 #
30 # - Either _fut_waiter is None, and _step() is scheduled;
31 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
32 #
33 # The only transition from the latter to the former is through
34 # _wakeup(). When _fut_waiter is not None, one of its callbacks
35 # must be _wakeup().
36
37 # Weak set containing all tasks alive.
38 _all_tasks = weakref.WeakSet()
39
Guido van Rossum1a605ed2013-12-06 12:57:40 -080040 # Dictionary containing tasks that are currently active in
41 # all running event loops. {EventLoop: Task}
42 _current_tasks = {}
43
44 @classmethod
45 def current_task(cls, loop=None):
46 """Return the currently running task in an event loop or None.
47
48 By default the current task for the current event loop is returned.
49
50 None is returned when called not in the context of a Task.
51 """
52 if loop is None:
53 loop = events.get_event_loop()
54 return cls._current_tasks.get(loop)
55
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070056 @classmethod
57 def all_tasks(cls, loop=None):
58 """Return a set of all tasks for an event loop.
59
60 By default all tasks for the current event loop are returned.
61 """
62 if loop is None:
63 loop = events.get_event_loop()
64 return {t for t in cls._all_tasks if t._loop is loop}
65
66 def __init__(self, coro, *, loop=None):
Victor Stinnerf951d282014-06-29 00:46:45 +020067 assert coroutines.iscoroutine(coro), repr(coro) # Not a coroutine function!
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070068 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +020069 if self._source_traceback:
70 del self._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070071 self._coro = iter(coro) # Use the iterator just in case.
72 self._fut_waiter = None
73 self._must_cancel = False
74 self._loop.call_soon(self._step)
75 self.__class__._all_tasks.add(self)
Victor Stinner98b63912014-06-30 14:51:04 +020076 # If False, don't log a message if the task is destroyed whereas its
77 # status is still pending
78 self._log_destroy_pending = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070079
Victor Stinnera02f81f2014-06-24 22:37:53 +020080 # On Python 3.3 or older, objects with a destructor part of a reference
81 # cycle are never destroyed. It's not more the case on Python 3.4 thanks to
82 # the PEP 442.
83 if _PY34:
84 def __del__(self):
Victor Stinner98b63912014-06-30 14:51:04 +020085 if self._state == futures._PENDING and self._log_destroy_pending:
Victor Stinner80f53aa2014-06-27 13:52:20 +020086 context = {
Victor Stinnera02f81f2014-06-24 22:37:53 +020087 'task': self,
88 'message': 'Task was destroyed but it is pending!',
Victor Stinner80f53aa2014-06-27 13:52:20 +020089 }
90 if self._source_traceback:
91 context['source_traceback'] = self._source_traceback
92 self._loop.call_exception_handler(context)
Victor Stinnera02f81f2014-06-24 22:37:53 +020093 futures.Future.__del__(self)
94
Victor Stinner313a9802014-07-29 12:58:23 +020095 def _repr_info(self):
96 info = super()._repr_info()
97
Victor Stinner975735f2014-06-25 21:41:58 +020098 if self._must_cancel:
Victor Stinner313a9802014-07-29 12:58:23 +020099 # replace status
100 info[0] = 'cancelling'
Victor Stinner975735f2014-06-25 21:41:58 +0200101
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200102 coro = coroutines._format_coroutine(self._coro)
Victor Stinner313a9802014-07-29 12:58:23 +0200103 info.insert(1, 'coro=<%s>' % coro)
Victor Stinner975735f2014-06-25 21:41:58 +0200104
Victor Stinner2dba23a2014-07-03 00:59:00 +0200105 if self._fut_waiter is not None:
Victor Stinner313a9802014-07-29 12:58:23 +0200106 info.insert(2, 'wait_for=%r' % self._fut_waiter)
107 return info
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700108
109 def get_stack(self, *, limit=None):
110 """Return the list of stack frames for this task's coroutine.
111
112 If the coroutine is active, this returns the stack where it is
113 suspended. If the coroutine has completed successfully or was
114 cancelled, this returns an empty list. If the coroutine was
115 terminated by an exception, this returns the list of traceback
116 frames.
117
118 The frames are always ordered from oldest to newest.
119
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500120 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700121 return; by default all available frames are returned. Its
122 meaning differs depending on whether a stack or a traceback is
123 returned: the newest frames of a stack are returned, but the
124 oldest frames of a traceback are returned. (This matches the
125 behavior of the traceback module.)
126
127 For reasons beyond our control, only one stack frame is
128 returned for a suspended coroutine.
129 """
130 frames = []
131 f = self._coro.gi_frame
132 if f is not None:
133 while f is not None:
134 if limit is not None:
135 if limit <= 0:
136 break
137 limit -= 1
138 frames.append(f)
139 f = f.f_back
140 frames.reverse()
141 elif self._exception is not None:
142 tb = self._exception.__traceback__
143 while tb is not None:
144 if limit is not None:
145 if limit <= 0:
146 break
147 limit -= 1
148 frames.append(tb.tb_frame)
149 tb = tb.tb_next
150 return frames
151
152 def print_stack(self, *, limit=None, file=None):
153 """Print the stack or traceback for this task's coroutine.
154
155 This produces output similar to that of the traceback module,
156 for the frames retrieved by get_stack(). The limit argument
157 is passed to get_stack(). The file argument is an I/O stream
158 to which the output goes; by default it goes to sys.stderr.
159 """
160 extracted_list = []
161 checked = set()
162 for f in self.get_stack(limit=limit):
163 lineno = f.f_lineno
164 co = f.f_code
165 filename = co.co_filename
166 name = co.co_name
167 if filename not in checked:
168 checked.add(filename)
169 linecache.checkcache(filename)
170 line = linecache.getline(filename, lineno, f.f_globals)
171 extracted_list.append((filename, lineno, name, line))
172 exc = self._exception
173 if not extracted_list:
174 print('No stack for %r' % self, file=file)
175 elif exc is not None:
176 print('Traceback for %r (most recent call last):' % self,
177 file=file)
178 else:
179 print('Stack for %r (most recent call last):' % self,
180 file=file)
181 traceback.print_list(extracted_list, file=file)
182 if exc is not None:
183 for line in traceback.format_exception_only(exc.__class__, exc):
184 print(line, file=file, end='')
185
186 def cancel(self):
Victor Stinner8d213572014-06-02 23:06:46 +0200187 """Request this task to cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200188
Victor Stinner8d213572014-06-02 23:06:46 +0200189 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200190 wrapped coroutine on the next cycle through the event loop.
191 The coroutine then has a chance to clean up or even deny
192 the request using try/except/finally.
193
194 Contrary to Future.cancel(), this does not guarantee that the
195 task will be cancelled: the exception might be caught and
196 acted upon, delaying cancellation of the task or preventing it
197 completely. The task may also return a value or raise a
198 different exception.
199
200 Immediately after this method is called, Task.cancelled() will
201 not return True (unless the task was already cancelled). A
202 task will be marked as cancelled when the wrapped coroutine
203 terminates with a CancelledError exception (even if cancel()
204 was not called).
205 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700206 if self.done():
207 return False
208 if self._fut_waiter is not None:
209 if self._fut_waiter.cancel():
210 # Leave self._fut_waiter; it may be a Task that
211 # catches and ignores the cancellation so we may have
212 # to cancel it again later.
213 return True
214 # It must be the case that self._step is already scheduled.
215 self._must_cancel = True
216 return True
217
218 def _step(self, value=None, exc=None):
219 assert not self.done(), \
220 '_step(): already done: {!r}, {!r}, {!r}'.format(self, value, exc)
221 if self._must_cancel:
222 if not isinstance(exc, futures.CancelledError):
223 exc = futures.CancelledError()
224 self._must_cancel = False
225 coro = self._coro
226 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800227
228 self.__class__._current_tasks[self._loop] = self
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700229 # Call either coro.throw(exc) or coro.send(value).
230 try:
231 if exc is not None:
232 result = coro.throw(exc)
233 elif value is not None:
234 result = coro.send(value)
235 else:
236 result = next(coro)
237 except StopIteration as exc:
238 self.set_result(exc.value)
239 except futures.CancelledError as exc:
240 super().cancel() # I.e., Future.cancel(self).
241 except Exception as exc:
242 self.set_exception(exc)
243 except BaseException as exc:
244 self.set_exception(exc)
245 raise
246 else:
247 if isinstance(result, futures.Future):
248 # Yielded Future must come from Future.__iter__().
249 if result._blocking:
250 result._blocking = False
251 result.add_done_callback(self._wakeup)
252 self._fut_waiter = result
253 if self._must_cancel:
254 if self._fut_waiter.cancel():
255 self._must_cancel = False
256 else:
257 self._loop.call_soon(
258 self._step, None,
259 RuntimeError(
260 'yield was used instead of yield from '
261 'in task {!r} with {!r}'.format(self, result)))
262 elif result is None:
263 # Bare yield relinquishes control for one event loop iteration.
264 self._loop.call_soon(self._step)
265 elif inspect.isgenerator(result):
266 # Yielding a generator is just wrong.
267 self._loop.call_soon(
268 self._step, None,
269 RuntimeError(
270 'yield was used instead of yield from for '
271 'generator in task {!r} with {}'.format(
272 self, result)))
273 else:
274 # Yielding something else is an error.
275 self._loop.call_soon(
276 self._step, None,
277 RuntimeError(
278 'Task got bad yield: {!r}'.format(result)))
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800279 finally:
280 self.__class__._current_tasks.pop(self._loop)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100281 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700282
283 def _wakeup(self, future):
284 try:
285 value = future.result()
286 except Exception as exc:
287 # This may also be a cancellation.
288 self._step(None, exc)
289 else:
290 self._step(value, None)
291 self = None # Needed to break cycles when an exception occurs.
292
293
294# wait() and as_completed() similar to those in PEP 3148.
295
296FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
297FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
298ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
299
300
301@coroutine
302def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
303 """Wait for the Futures and coroutines given by fs to complete.
304
Victor Stinnerdb74d982014-06-10 11:16:05 +0200305 The sequence futures must not be empty.
306
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700307 Coroutines will be wrapped in Tasks.
308
309 Returns two sets of Future: (done, pending).
310
311 Usage:
312
313 done, pending = yield from asyncio.wait(fs)
314
315 Note: This does not raise TimeoutError! Futures that aren't done
316 when the timeout occurs are returned in the second set.
317 """
Victor Stinnerf951d282014-06-29 00:46:45 +0200318 if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100319 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700320 if not fs:
321 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200322 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
323 raise ValueError('Invalid return_when value: {}'.format(return_when))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700324
325 if loop is None:
326 loop = events.get_event_loop()
327
Yury Selivanov622be342014-02-06 22:06:16 -0500328 fs = {async(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700329
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700330 return (yield from _wait(fs, timeout, return_when, loop))
331
332
Victor Stinner59e08022014-08-28 11:19:25 +0200333def _release_waiter(waiter, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700334 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200335 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700336
337
338@coroutine
339def wait_for(fut, timeout, *, loop=None):
340 """Wait for the single Future or coroutine to complete, with timeout.
341
342 Coroutine will be wrapped in Task.
343
Victor Stinner421e49b2014-01-23 17:40:59 +0100344 Returns result of the Future or coroutine. When a timeout occurs,
345 it cancels the task and raises TimeoutError. To avoid the task
346 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700347
348 Usage:
349
350 result = yield from asyncio.wait_for(fut, 10.0)
351
352 """
353 if loop is None:
354 loop = events.get_event_loop()
355
Guido van Rossum48c66c32014-01-29 14:30:38 -0800356 if timeout is None:
357 return (yield from fut)
358
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700359 waiter = futures.Future(loop=loop)
Victor Stinner59e08022014-08-28 11:19:25 +0200360 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
361 cb = functools.partial(_release_waiter, waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700362
363 fut = async(fut, loop=loop)
364 fut.add_done_callback(cb)
365
366 try:
Victor Stinner59e08022014-08-28 11:19:25 +0200367 # wait until the future completes or the timeout
368 yield from waiter
369
370 if fut.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700371 return fut.result()
372 else:
373 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100374 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700375 raise futures.TimeoutError()
376 finally:
377 timeout_handle.cancel()
378
379
380@coroutine
381def _wait(fs, timeout, return_when, loop):
382 """Internal helper for wait() and _wait_for().
383
384 The fs argument must be a collection of Futures.
385 """
386 assert fs, 'Set of Futures is empty.'
387 waiter = futures.Future(loop=loop)
388 timeout_handle = None
389 if timeout is not None:
390 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
391 counter = len(fs)
392
393 def _on_completion(f):
394 nonlocal counter
395 counter -= 1
396 if (counter <= 0 or
397 return_when == FIRST_COMPLETED or
398 return_when == FIRST_EXCEPTION and (not f.cancelled() and
399 f.exception() is not None)):
400 if timeout_handle is not None:
401 timeout_handle.cancel()
402 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200403 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700404
405 for f in fs:
406 f.add_done_callback(_on_completion)
407
408 try:
409 yield from waiter
410 finally:
411 if timeout_handle is not None:
412 timeout_handle.cancel()
413
414 done, pending = set(), set()
415 for f in fs:
416 f.remove_done_callback(_on_completion)
417 if f.done():
418 done.add(f)
419 else:
420 pending.add(f)
421 return done, pending
422
423
424# This is *not* a @coroutine! It is just an iterator (yielding Futures).
425def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800426 """Return an iterator whose values are coroutines.
427
428 When waiting for the yielded coroutines you'll get the results (or
429 exceptions!) of the original Futures (or coroutines), in the order
430 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700431
432 This differs from PEP 3148; the proper way to use this is:
433
434 for f in as_completed(fs):
435 result = yield from f # The 'yield from' may raise.
436 # Use result.
437
Guido van Rossumb58f0532014-02-12 17:58:19 -0800438 If a timeout is specified, the 'yield from' will raise
439 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700440
441 Note: The futures 'f' are not necessarily members of fs.
442 """
Victor Stinnerf951d282014-06-29 00:46:45 +0200443 if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100444 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700445 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov622be342014-02-06 22:06:16 -0500446 todo = {async(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800447 from .queues import Queue # Import here to avoid circular import problem.
448 done = Queue(loop=loop)
449 timeout_handle = None
450
451 def _on_timeout():
452 for f in todo:
453 f.remove_done_callback(_on_completion)
454 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
455 todo.clear() # Can't do todo.remove(f) in the loop.
456
457 def _on_completion(f):
458 if not todo:
459 return # _on_timeout() was here first.
460 todo.remove(f)
461 done.put_nowait(f)
462 if not todo and timeout_handle is not None:
463 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700464
465 @coroutine
466 def _wait_for_one():
Guido van Rossumb58f0532014-02-12 17:58:19 -0800467 f = yield from done.get()
468 if f is None:
469 # Dummy value from _on_timeout().
470 raise futures.TimeoutError
471 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700472
Guido van Rossumb58f0532014-02-12 17:58:19 -0800473 for f in todo:
474 f.add_done_callback(_on_completion)
475 if todo and timeout is not None:
476 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700477 for _ in range(len(todo)):
478 yield _wait_for_one()
479
480
481@coroutine
482def sleep(delay, result=None, *, loop=None):
483 """Coroutine that completes after a given time (in seconds)."""
484 future = futures.Future(loop=loop)
Victor Stinnera9acbe82014-07-05 15:29:41 +0200485 h = future._loop.call_later(delay,
486 future._set_result_unless_cancelled, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700487 try:
488 return (yield from future)
489 finally:
490 h.cancel()
491
492
493def async(coro_or_future, *, loop=None):
494 """Wrap a coroutine in a future.
495
496 If the argument is a Future, it is returned directly.
497 """
498 if isinstance(coro_or_future, futures.Future):
499 if loop is not None and loop is not coro_or_future._loop:
500 raise ValueError('loop argument must agree with Future')
501 return coro_or_future
Victor Stinnerf951d282014-06-29 00:46:45 +0200502 elif coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200503 if loop is None:
504 loop = events.get_event_loop()
505 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200506 if task._source_traceback:
507 del task._source_traceback[-1]
508 return task
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700509 else:
510 raise TypeError('A Future or coroutine is required')
511
512
513class _GatheringFuture(futures.Future):
514 """Helper for gather().
515
516 This overrides cancel() to cancel all the children and act more
517 like Task.cancel(), which doesn't immediately mark itself as
518 cancelled.
519 """
520
521 def __init__(self, children, *, loop=None):
522 super().__init__(loop=loop)
523 self._children = children
524
525 def cancel(self):
526 if self.done():
527 return False
528 for child in self._children:
529 child.cancel()
530 return True
531
532
533def gather(*coros_or_futures, loop=None, return_exceptions=False):
534 """Return a future aggregating results from the given coroutines
535 or futures.
536
537 All futures must share the same event loop. If all the tasks are
538 done successfully, the returned future's result is the list of
539 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500540 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700541 exceptions in the tasks are treated the same as successful
542 results, and gathered in the result list; otherwise, the first
543 raised exception will be immediately propagated to the returned
544 future.
545
546 Cancellation: if the outer Future is cancelled, all children (that
547 have not completed yet) are also cancelled. If any child is
548 cancelled, this is treated as if it raised CancelledError --
549 the outer Future is *not* cancelled in this case. (This is to
550 prevent the cancellation of one child to cause other children to
551 be cancelled.)
552 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200553 if not coros_or_futures:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700554 outer = futures.Future(loop=loop)
555 outer.set_result([])
556 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200557
558 arg_to_fut = {}
559 for arg in set(coros_or_futures):
560 if not isinstance(arg, futures.Future):
561 fut = async(arg, loop=loop)
562 if loop is None:
563 loop = fut._loop
564 # The caller cannot control this future, the "destroy pending task"
565 # warning should not be emitted.
566 fut._log_destroy_pending = False
567 else:
568 fut = arg
569 if loop is None:
570 loop = fut._loop
571 elif fut._loop is not loop:
572 raise ValueError("futures are tied to different event loops")
573 arg_to_fut[arg] = fut
574
575 children = [arg_to_fut[arg] for arg in coros_or_futures]
576 nchildren = len(children)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700577 outer = _GatheringFuture(children, loop=loop)
578 nfinished = 0
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200579 results = [None] * nchildren
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700580
581 def _done_callback(i, fut):
582 nonlocal nfinished
583 if outer._state != futures._PENDING:
584 if fut._exception is not None:
585 # Mark exception retrieved.
586 fut.exception()
587 return
588 if fut._state == futures._CANCELLED:
589 res = futures.CancelledError()
590 if not return_exceptions:
591 outer.set_exception(res)
592 return
593 elif fut._exception is not None:
594 res = fut.exception() # Mark exception retrieved.
595 if not return_exceptions:
596 outer.set_exception(res)
597 return
598 else:
599 res = fut._result
600 results[i] = res
601 nfinished += 1
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200602 if nfinished == nchildren:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700603 outer.set_result(results)
604
605 for i, fut in enumerate(children):
606 fut.add_done_callback(functools.partial(_done_callback, i))
607 return outer
608
609
610def shield(arg, *, loop=None):
611 """Wait for a future, shielding it from cancellation.
612
613 The statement
614
615 res = yield from shield(something())
616
617 is exactly equivalent to the statement
618
619 res = yield from something()
620
621 *except* that if the coroutine containing it is cancelled, the
622 task running in something() is not cancelled. From the POV of
623 something(), the cancellation did not happen. But its caller is
624 still cancelled, so the yield-from expression still raises
625 CancelledError. Note: If something() is cancelled by other means
626 this will still cancel shield().
627
628 If you want to completely ignore cancellation (not recommended)
629 you can combine shield() with a try/except clause, as follows:
630
631 try:
632 res = yield from shield(something())
633 except CancelledError:
634 res = None
635 """
636 inner = async(arg, loop=loop)
637 if inner.done():
638 # Shortcut.
639 return inner
640 loop = inner._loop
641 outer = futures.Future(loop=loop)
642
643 def _done_callback(inner):
644 if outer.cancelled():
645 # Mark inner's result as retrieved.
646 inner.cancelled() or inner.exception()
647 return
648 if inner.cancelled():
649 outer.cancel()
650 else:
651 exc = inner.exception()
652 if exc is not None:
653 outer.set_exception(exc)
654 else:
655 outer.set_result(inner.result())
656
657 inner.add_done_callback(_done_callback)
658 return outer