blob: c6c22dd29f2ba0f339633e7f956785313762cecc [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
3__all__ = ['coroutine', 'Task',
Guido van Rossum68816ef2013-12-28 08:06:40 -10004 'iscoroutinefunction', 'iscoroutine',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
6 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
Guido van Rossumde3a1362013-11-29 09:29:00 -08007 'gather', 'shield',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008 ]
9
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070010import concurrent.futures
11import functools
12import inspect
13import linecache
Victor Stinner7ef60cd2014-02-19 23:15:02 +010014import os
15import sys
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016import traceback
17import weakref
18
19from . import events
20from . import futures
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070021from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
23# If you set _DEBUG to true, @coroutine will wrap the resulting
24# generator objects in a CoroWrapper instance (defined below). That
25# instance will log a message when the generator is never iterated
26# over, which may happen when you forget to use "yield from" with a
27# coroutine call. Note that the value of the _DEBUG flag is taken
28# when the decorator is used, so to be of any use it must be set
29# before you define your coroutines. A downside of using this feature
30# is that tracebacks show entries for the CoroWrapper.__next__ method
31# when _DEBUG is true.
Victor Stinner7ef60cd2014-02-19 23:15:02 +010032_DEBUG = (not sys.flags.ignore_environment
33 and bool(os.environ.get('PYTHONASYNCIODEBUG')))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070034
35
36class CoroWrapper:
Guido van Rossume1f55442014-01-16 11:05:23 -080037 # Wrapper for coroutine in _DEBUG mode.
38
Victor Stinnerbac77932014-01-16 01:55:29 +010039 __slots__ = ['gen', 'func', '__name__', '__doc__']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070040
41 def __init__(self, gen, func):
42 assert inspect.isgenerator(gen), gen
43 self.gen = gen
44 self.func = func
45
46 def __iter__(self):
47 return self
48
49 def __next__(self):
50 return next(self.gen)
51
Yury Selivanovf15f7482014-04-14 22:21:52 -040052 def send(self, *value):
53 # We use `*value` because of a bug in CPythons prior
54 # to 3.4.1. See issue #21209 and test_yield_from_corowrapper
55 # for details. This workaround should be removed in 3.5.0.
Yury Selivanov09cc1692014-04-15 12:01:16 -040056 if len(value) == 1:
57 value = value[0]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070058 return self.gen.send(value)
59
60 def throw(self, exc):
61 return self.gen.throw(exc)
62
63 def close(self):
64 return self.gen.close()
65
Guido van Rossum0cbc7682014-04-15 12:06:34 -070066 @property
67 def gi_frame(self):
68 return self.gen.gi_frame
69
70 @property
71 def gi_running(self):
72 return self.gen.gi_running
73
74 @property
75 def gi_code(self):
76 return self.gen.gi_code
77
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070078 def __del__(self):
79 frame = self.gen.gi_frame
80 if frame is not None and frame.f_lasti == -1:
81 func = self.func
82 code = func.__code__
83 filename = code.co_filename
84 lineno = code.co_firstlineno
Guido van Rossum2b430b82013-11-01 14:13:30 -070085 logger.error(
86 'Coroutine %r defined at %s:%s was never yielded from',
87 func.__name__, filename, lineno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070088
89
90def coroutine(func):
91 """Decorator to mark coroutines.
92
93 If the coroutine is not yielded from before it is destroyed,
94 an error message is logged.
95 """
96 if inspect.isgeneratorfunction(func):
97 coro = func
98 else:
99 @functools.wraps(func)
100 def coro(*args, **kw):
101 res = func(*args, **kw)
102 if isinstance(res, futures.Future) or inspect.isgenerator(res):
103 res = yield from res
104 return res
105
106 if not _DEBUG:
107 wrapper = coro
108 else:
109 @functools.wraps(func)
110 def wrapper(*args, **kwds):
111 w = CoroWrapper(coro(*args, **kwds), func)
112 w.__name__ = coro.__name__
113 w.__doc__ = coro.__doc__
114 return w
115
116 wrapper._is_coroutine = True # For iscoroutinefunction().
117 return wrapper
118
119
120def iscoroutinefunction(func):
121 """Return True if func is a decorated coroutine function."""
122 return getattr(func, '_is_coroutine', False)
123
124
125def iscoroutine(obj):
126 """Return True if obj is a coroutine object."""
127 return isinstance(obj, CoroWrapper) or inspect.isgenerator(obj)
128
129
130class Task(futures.Future):
131 """A coroutine wrapped in a Future."""
132
133 # An important invariant maintained while a Task not done:
134 #
135 # - Either _fut_waiter is None, and _step() is scheduled;
136 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
137 #
138 # The only transition from the latter to the former is through
139 # _wakeup(). When _fut_waiter is not None, one of its callbacks
140 # must be _wakeup().
141
142 # Weak set containing all tasks alive.
143 _all_tasks = weakref.WeakSet()
144
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800145 # Dictionary containing tasks that are currently active in
146 # all running event loops. {EventLoop: Task}
147 _current_tasks = {}
148
149 @classmethod
150 def current_task(cls, loop=None):
151 """Return the currently running task in an event loop or None.
152
153 By default the current task for the current event loop is returned.
154
155 None is returned when called not in the context of a Task.
156 """
157 if loop is None:
158 loop = events.get_event_loop()
159 return cls._current_tasks.get(loop)
160
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700161 @classmethod
162 def all_tasks(cls, loop=None):
163 """Return a set of all tasks for an event loop.
164
165 By default all tasks for the current event loop are returned.
166 """
167 if loop is None:
168 loop = events.get_event_loop()
169 return {t for t in cls._all_tasks if t._loop is loop}
170
171 def __init__(self, coro, *, loop=None):
172 assert iscoroutine(coro), repr(coro) # Not a coroutine function!
173 super().__init__(loop=loop)
174 self._coro = iter(coro) # Use the iterator just in case.
175 self._fut_waiter = None
176 self._must_cancel = False
177 self._loop.call_soon(self._step)
178 self.__class__._all_tasks.add(self)
179
180 def __repr__(self):
181 res = super().__repr__()
182 if (self._must_cancel and
183 self._state == futures._PENDING and
184 '<PENDING' in res):
185 res = res.replace('<PENDING', '<CANCELLING', 1)
186 i = res.find('<')
187 if i < 0:
188 i = len(res)
189 res = res[:i] + '(<{}>)'.format(self._coro.__name__) + res[i:]
190 return res
191
192 def get_stack(self, *, limit=None):
193 """Return the list of stack frames for this task's coroutine.
194
195 If the coroutine is active, this returns the stack where it is
196 suspended. If the coroutine has completed successfully or was
197 cancelled, this returns an empty list. If the coroutine was
198 terminated by an exception, this returns the list of traceback
199 frames.
200
201 The frames are always ordered from oldest to newest.
202
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500203 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700204 return; by default all available frames are returned. Its
205 meaning differs depending on whether a stack or a traceback is
206 returned: the newest frames of a stack are returned, but the
207 oldest frames of a traceback are returned. (This matches the
208 behavior of the traceback module.)
209
210 For reasons beyond our control, only one stack frame is
211 returned for a suspended coroutine.
212 """
213 frames = []
214 f = self._coro.gi_frame
215 if f is not None:
216 while f is not None:
217 if limit is not None:
218 if limit <= 0:
219 break
220 limit -= 1
221 frames.append(f)
222 f = f.f_back
223 frames.reverse()
224 elif self._exception is not None:
225 tb = self._exception.__traceback__
226 while tb is not None:
227 if limit is not None:
228 if limit <= 0:
229 break
230 limit -= 1
231 frames.append(tb.tb_frame)
232 tb = tb.tb_next
233 return frames
234
235 def print_stack(self, *, limit=None, file=None):
236 """Print the stack or traceback for this task's coroutine.
237
238 This produces output similar to that of the traceback module,
239 for the frames retrieved by get_stack(). The limit argument
240 is passed to get_stack(). The file argument is an I/O stream
241 to which the output goes; by default it goes to sys.stderr.
242 """
243 extracted_list = []
244 checked = set()
245 for f in self.get_stack(limit=limit):
246 lineno = f.f_lineno
247 co = f.f_code
248 filename = co.co_filename
249 name = co.co_name
250 if filename not in checked:
251 checked.add(filename)
252 linecache.checkcache(filename)
253 line = linecache.getline(filename, lineno, f.f_globals)
254 extracted_list.append((filename, lineno, name, line))
255 exc = self._exception
256 if not extracted_list:
257 print('No stack for %r' % self, file=file)
258 elif exc is not None:
259 print('Traceback for %r (most recent call last):' % self,
260 file=file)
261 else:
262 print('Stack for %r (most recent call last):' % self,
263 file=file)
264 traceback.print_list(extracted_list, file=file)
265 if exc is not None:
266 for line in traceback.format_exception_only(exc.__class__, exc):
267 print(line, file=file, end='')
268
269 def cancel(self):
Victor Stinner4bd652a2014-04-07 11:18:06 +0200270 """Request that a task to cancel itself.
271
272 This arranges for a CancellationError to be thrown into the
273 wrapped coroutine on the next cycle through the event loop.
274 The coroutine then has a chance to clean up or even deny
275 the request using try/except/finally.
276
277 Contrary to Future.cancel(), this does not guarantee that the
278 task will be cancelled: the exception might be caught and
279 acted upon, delaying cancellation of the task or preventing it
280 completely. The task may also return a value or raise a
281 different exception.
282
283 Immediately after this method is called, Task.cancelled() will
284 not return True (unless the task was already cancelled). A
285 task will be marked as cancelled when the wrapped coroutine
286 terminates with a CancelledError exception (even if cancel()
287 was not called).
288 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700289 if self.done():
290 return False
291 if self._fut_waiter is not None:
292 if self._fut_waiter.cancel():
293 # Leave self._fut_waiter; it may be a Task that
294 # catches and ignores the cancellation so we may have
295 # to cancel it again later.
296 return True
297 # It must be the case that self._step is already scheduled.
298 self._must_cancel = True
299 return True
300
301 def _step(self, value=None, exc=None):
302 assert not self.done(), \
303 '_step(): already done: {!r}, {!r}, {!r}'.format(self, value, exc)
304 if self._must_cancel:
305 if not isinstance(exc, futures.CancelledError):
306 exc = futures.CancelledError()
307 self._must_cancel = False
308 coro = self._coro
309 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800310
311 self.__class__._current_tasks[self._loop] = self
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700312 # Call either coro.throw(exc) or coro.send(value).
313 try:
314 if exc is not None:
315 result = coro.throw(exc)
316 elif value is not None:
317 result = coro.send(value)
318 else:
319 result = next(coro)
320 except StopIteration as exc:
321 self.set_result(exc.value)
322 except futures.CancelledError as exc:
323 super().cancel() # I.e., Future.cancel(self).
324 except Exception as exc:
325 self.set_exception(exc)
326 except BaseException as exc:
327 self.set_exception(exc)
328 raise
329 else:
330 if isinstance(result, futures.Future):
331 # Yielded Future must come from Future.__iter__().
332 if result._blocking:
333 result._blocking = False
334 result.add_done_callback(self._wakeup)
335 self._fut_waiter = result
336 if self._must_cancel:
337 if self._fut_waiter.cancel():
338 self._must_cancel = False
339 else:
340 self._loop.call_soon(
341 self._step, None,
342 RuntimeError(
343 'yield was used instead of yield from '
344 'in task {!r} with {!r}'.format(self, result)))
345 elif result is None:
346 # Bare yield relinquishes control for one event loop iteration.
347 self._loop.call_soon(self._step)
348 elif inspect.isgenerator(result):
349 # Yielding a generator is just wrong.
350 self._loop.call_soon(
351 self._step, None,
352 RuntimeError(
353 'yield was used instead of yield from for '
354 'generator in task {!r} with {}'.format(
355 self, result)))
356 else:
357 # Yielding something else is an error.
358 self._loop.call_soon(
359 self._step, None,
360 RuntimeError(
361 'Task got bad yield: {!r}'.format(result)))
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800362 finally:
363 self.__class__._current_tasks.pop(self._loop)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100364 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700365
366 def _wakeup(self, future):
367 try:
368 value = future.result()
369 except Exception as exc:
370 # This may also be a cancellation.
371 self._step(None, exc)
372 else:
373 self._step(value, None)
374 self = None # Needed to break cycles when an exception occurs.
375
376
377# wait() and as_completed() similar to those in PEP 3148.
378
379FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
380FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
381ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
382
383
384@coroutine
385def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
386 """Wait for the Futures and coroutines given by fs to complete.
387
388 Coroutines will be wrapped in Tasks.
389
390 Returns two sets of Future: (done, pending).
391
392 Usage:
393
394 done, pending = yield from asyncio.wait(fs)
395
396 Note: This does not raise TimeoutError! Futures that aren't done
397 when the timeout occurs are returned in the second set.
398 """
Victor Stinnereb748762014-02-11 11:54:08 +0100399 if isinstance(fs, futures.Future) or iscoroutine(fs):
400 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700401 if not fs:
402 raise ValueError('Set of coroutines/Futures is empty.')
403
404 if loop is None:
405 loop = events.get_event_loop()
406
Yury Selivanov622be342014-02-06 22:06:16 -0500407 fs = {async(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700408
409 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
410 raise ValueError('Invalid return_when value: {}'.format(return_when))
411 return (yield from _wait(fs, timeout, return_when, loop))
412
413
414def _release_waiter(waiter, value=True, *args):
415 if not waiter.done():
416 waiter.set_result(value)
417
418
419@coroutine
420def wait_for(fut, timeout, *, loop=None):
421 """Wait for the single Future or coroutine to complete, with timeout.
422
423 Coroutine will be wrapped in Task.
424
Victor Stinner421e49b2014-01-23 17:40:59 +0100425 Returns result of the Future or coroutine. When a timeout occurs,
426 it cancels the task and raises TimeoutError. To avoid the task
427 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700428
429 Usage:
430
431 result = yield from asyncio.wait_for(fut, 10.0)
432
433 """
434 if loop is None:
435 loop = events.get_event_loop()
436
Guido van Rossum48c66c32014-01-29 14:30:38 -0800437 if timeout is None:
438 return (yield from fut)
439
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700440 waiter = futures.Future(loop=loop)
441 timeout_handle = loop.call_later(timeout, _release_waiter, waiter, False)
442 cb = functools.partial(_release_waiter, waiter, True)
443
444 fut = async(fut, loop=loop)
445 fut.add_done_callback(cb)
446
447 try:
448 if (yield from waiter):
449 return fut.result()
450 else:
451 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100452 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700453 raise futures.TimeoutError()
454 finally:
455 timeout_handle.cancel()
456
457
458@coroutine
459def _wait(fs, timeout, return_when, loop):
460 """Internal helper for wait() and _wait_for().
461
462 The fs argument must be a collection of Futures.
463 """
464 assert fs, 'Set of Futures is empty.'
465 waiter = futures.Future(loop=loop)
466 timeout_handle = None
467 if timeout is not None:
468 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
469 counter = len(fs)
470
471 def _on_completion(f):
472 nonlocal counter
473 counter -= 1
474 if (counter <= 0 or
475 return_when == FIRST_COMPLETED or
476 return_when == FIRST_EXCEPTION and (not f.cancelled() and
477 f.exception() is not None)):
478 if timeout_handle is not None:
479 timeout_handle.cancel()
480 if not waiter.done():
481 waiter.set_result(False)
482
483 for f in fs:
484 f.add_done_callback(_on_completion)
485
486 try:
487 yield from waiter
488 finally:
489 if timeout_handle is not None:
490 timeout_handle.cancel()
491
492 done, pending = set(), set()
493 for f in fs:
494 f.remove_done_callback(_on_completion)
495 if f.done():
496 done.add(f)
497 else:
498 pending.add(f)
499 return done, pending
500
501
502# This is *not* a @coroutine! It is just an iterator (yielding Futures).
503def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800504 """Return an iterator whose values are coroutines.
505
506 When waiting for the yielded coroutines you'll get the results (or
507 exceptions!) of the original Futures (or coroutines), in the order
508 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700509
510 This differs from PEP 3148; the proper way to use this is:
511
512 for f in as_completed(fs):
513 result = yield from f # The 'yield from' may raise.
514 # Use result.
515
Guido van Rossumb58f0532014-02-12 17:58:19 -0800516 If a timeout is specified, the 'yield from' will raise
517 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700518
519 Note: The futures 'f' are not necessarily members of fs.
520 """
Victor Stinnereb748762014-02-11 11:54:08 +0100521 if isinstance(fs, futures.Future) or iscoroutine(fs):
522 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700523 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov622be342014-02-06 22:06:16 -0500524 todo = {async(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800525 from .queues import Queue # Import here to avoid circular import problem.
526 done = Queue(loop=loop)
527 timeout_handle = None
528
529 def _on_timeout():
530 for f in todo:
531 f.remove_done_callback(_on_completion)
532 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
533 todo.clear() # Can't do todo.remove(f) in the loop.
534
535 def _on_completion(f):
536 if not todo:
537 return # _on_timeout() was here first.
538 todo.remove(f)
539 done.put_nowait(f)
540 if not todo and timeout_handle is not None:
541 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700542
543 @coroutine
544 def _wait_for_one():
Guido van Rossumb58f0532014-02-12 17:58:19 -0800545 f = yield from done.get()
546 if f is None:
547 # Dummy value from _on_timeout().
548 raise futures.TimeoutError
549 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700550
Guido van Rossumb58f0532014-02-12 17:58:19 -0800551 for f in todo:
552 f.add_done_callback(_on_completion)
553 if todo and timeout is not None:
554 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700555 for _ in range(len(todo)):
556 yield _wait_for_one()
557
558
559@coroutine
560def sleep(delay, result=None, *, loop=None):
561 """Coroutine that completes after a given time (in seconds)."""
562 future = futures.Future(loop=loop)
563 h = future._loop.call_later(delay, future.set_result, result)
564 try:
565 return (yield from future)
566 finally:
567 h.cancel()
568
569
570def async(coro_or_future, *, loop=None):
571 """Wrap a coroutine in a future.
572
573 If the argument is a Future, it is returned directly.
574 """
575 if isinstance(coro_or_future, futures.Future):
576 if loop is not None and loop is not coro_or_future._loop:
577 raise ValueError('loop argument must agree with Future')
578 return coro_or_future
579 elif iscoroutine(coro_or_future):
580 return Task(coro_or_future, loop=loop)
581 else:
582 raise TypeError('A Future or coroutine is required')
583
584
585class _GatheringFuture(futures.Future):
586 """Helper for gather().
587
588 This overrides cancel() to cancel all the children and act more
589 like Task.cancel(), which doesn't immediately mark itself as
590 cancelled.
591 """
592
593 def __init__(self, children, *, loop=None):
594 super().__init__(loop=loop)
595 self._children = children
596
597 def cancel(self):
598 if self.done():
599 return False
600 for child in self._children:
601 child.cancel()
602 return True
603
604
605def gather(*coros_or_futures, loop=None, return_exceptions=False):
606 """Return a future aggregating results from the given coroutines
607 or futures.
608
609 All futures must share the same event loop. If all the tasks are
610 done successfully, the returned future's result is the list of
611 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500612 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700613 exceptions in the tasks are treated the same as successful
614 results, and gathered in the result list; otherwise, the first
615 raised exception will be immediately propagated to the returned
616 future.
617
618 Cancellation: if the outer Future is cancelled, all children (that
619 have not completed yet) are also cancelled. If any child is
620 cancelled, this is treated as if it raised CancelledError --
621 the outer Future is *not* cancelled in this case. (This is to
622 prevent the cancellation of one child to cause other children to
623 be cancelled.)
624 """
Yury Selivanov622be342014-02-06 22:06:16 -0500625 arg_to_fut = {arg: async(arg, loop=loop) for arg in set(coros_or_futures)}
626 children = [arg_to_fut[arg] for arg in coros_or_futures]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700627 n = len(children)
628 if n == 0:
629 outer = futures.Future(loop=loop)
630 outer.set_result([])
631 return outer
632 if loop is None:
633 loop = children[0]._loop
634 for fut in children:
635 if fut._loop is not loop:
636 raise ValueError("futures are tied to different event loops")
637 outer = _GatheringFuture(children, loop=loop)
638 nfinished = 0
639 results = [None] * n
640
641 def _done_callback(i, fut):
642 nonlocal nfinished
643 if outer._state != futures._PENDING:
644 if fut._exception is not None:
645 # Mark exception retrieved.
646 fut.exception()
647 return
648 if fut._state == futures._CANCELLED:
649 res = futures.CancelledError()
650 if not return_exceptions:
651 outer.set_exception(res)
652 return
653 elif fut._exception is not None:
654 res = fut.exception() # Mark exception retrieved.
655 if not return_exceptions:
656 outer.set_exception(res)
657 return
658 else:
659 res = fut._result
660 results[i] = res
661 nfinished += 1
662 if nfinished == n:
663 outer.set_result(results)
664
665 for i, fut in enumerate(children):
666 fut.add_done_callback(functools.partial(_done_callback, i))
667 return outer
668
669
670def shield(arg, *, loop=None):
671 """Wait for a future, shielding it from cancellation.
672
673 The statement
674
675 res = yield from shield(something())
676
677 is exactly equivalent to the statement
678
679 res = yield from something()
680
681 *except* that if the coroutine containing it is cancelled, the
682 task running in something() is not cancelled. From the POV of
683 something(), the cancellation did not happen. But its caller is
684 still cancelled, so the yield-from expression still raises
685 CancelledError. Note: If something() is cancelled by other means
686 this will still cancel shield().
687
688 If you want to completely ignore cancellation (not recommended)
689 you can combine shield() with a try/except clause, as follows:
690
691 try:
692 res = yield from shield(something())
693 except CancelledError:
694 res = None
695 """
696 inner = async(arg, loop=loop)
697 if inner.done():
698 # Shortcut.
699 return inner
700 loop = inner._loop
701 outer = futures.Future(loop=loop)
702
703 def _done_callback(inner):
704 if outer.cancelled():
705 # Mark inner's result as retrieved.
706 inner.cancelled() or inner.exception()
707 return
708 if inner.cancelled():
709 outer.cancel()
710 else:
711 exc = inner.exception()
712 if exc is not None:
713 outer.set_exception(exc)
714 else:
715 outer.set_result(inner.result())
716
717 inner.add_done_callback(_done_callback)
718 return outer