blob: 45a6342eea4eeb42fe390eff84fa4a1a694f4a3a [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
3__all__ = ['coroutine', 'Task',
Guido van Rossum68816ef2013-12-28 08:06:40 -10004 'iscoroutinefunction', 'iscoroutine',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
6 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
Guido van Rossumde3a1362013-11-29 09:29:00 -08007 'gather', 'shield',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008 ]
9
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070010import concurrent.futures
11import functools
12import inspect
13import linecache
Victor Stinner7ef60cd2014-02-19 23:15:02 +010014import os
15import sys
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016import traceback
17import weakref
18
19from . import events
20from . import futures
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070021from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
23# If you set _DEBUG to true, @coroutine will wrap the resulting
24# generator objects in a CoroWrapper instance (defined below). That
25# instance will log a message when the generator is never iterated
26# over, which may happen when you forget to use "yield from" with a
27# coroutine call. Note that the value of the _DEBUG flag is taken
28# when the decorator is used, so to be of any use it must be set
29# before you define your coroutines. A downside of using this feature
30# is that tracebacks show entries for the CoroWrapper.__next__ method
31# when _DEBUG is true.
Victor Stinner7ef60cd2014-02-19 23:15:02 +010032_DEBUG = (not sys.flags.ignore_environment
33 and bool(os.environ.get('PYTHONASYNCIODEBUG')))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070034
35
36class CoroWrapper:
Guido van Rossume1f55442014-01-16 11:05:23 -080037 # Wrapper for coroutine in _DEBUG mode.
38
Guido van Rossum94ba1462014-04-27 10:44:22 -070039 __slots__ = ['gen', 'func', '__name__', '__doc__', '__weakref__']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070040
41 def __init__(self, gen, func):
42 assert inspect.isgenerator(gen), gen
43 self.gen = gen
44 self.func = func
45
46 def __iter__(self):
47 return self
48
49 def __next__(self):
50 return next(self.gen)
51
Yury Selivanovf15f7482014-04-14 22:21:52 -040052 def send(self, *value):
53 # We use `*value` because of a bug in CPythons prior
54 # to 3.4.1. See issue #21209 and test_yield_from_corowrapper
55 # for details. This workaround should be removed in 3.5.0.
Yury Selivanov09cc1692014-04-15 12:01:16 -040056 if len(value) == 1:
57 value = value[0]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070058 return self.gen.send(value)
59
60 def throw(self, exc):
61 return self.gen.throw(exc)
62
63 def close(self):
64 return self.gen.close()
65
Guido van Rossum0cbc7682014-04-15 12:06:34 -070066 @property
67 def gi_frame(self):
68 return self.gen.gi_frame
69
70 @property
71 def gi_running(self):
72 return self.gen.gi_running
73
74 @property
75 def gi_code(self):
76 return self.gen.gi_code
77
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070078 def __del__(self):
Guido van Rossum83c1ddd2014-04-27 10:33:58 -070079 # Be careful accessing self.gen.frame -- self.gen might not exist.
80 gen = getattr(self, 'gen', None)
81 frame = getattr(gen, 'gi_frame', None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070082 if frame is not None and frame.f_lasti == -1:
83 func = self.func
84 code = func.__code__
85 filename = code.co_filename
86 lineno = code.co_firstlineno
Guido van Rossum2b430b82013-11-01 14:13:30 -070087 logger.error(
88 'Coroutine %r defined at %s:%s was never yielded from',
89 func.__name__, filename, lineno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070090
91
92def coroutine(func):
93 """Decorator to mark coroutines.
94
95 If the coroutine is not yielded from before it is destroyed,
96 an error message is logged.
97 """
98 if inspect.isgeneratorfunction(func):
99 coro = func
100 else:
101 @functools.wraps(func)
102 def coro(*args, **kw):
103 res = func(*args, **kw)
104 if isinstance(res, futures.Future) or inspect.isgenerator(res):
105 res = yield from res
106 return res
107
108 if not _DEBUG:
109 wrapper = coro
110 else:
111 @functools.wraps(func)
112 def wrapper(*args, **kwds):
113 w = CoroWrapper(coro(*args, **kwds), func)
114 w.__name__ = coro.__name__
115 w.__doc__ = coro.__doc__
116 return w
117
118 wrapper._is_coroutine = True # For iscoroutinefunction().
119 return wrapper
120
121
122def iscoroutinefunction(func):
123 """Return True if func is a decorated coroutine function."""
124 return getattr(func, '_is_coroutine', False)
125
126
127def iscoroutine(obj):
128 """Return True if obj is a coroutine object."""
129 return isinstance(obj, CoroWrapper) or inspect.isgenerator(obj)
130
131
132class Task(futures.Future):
133 """A coroutine wrapped in a Future."""
134
135 # An important invariant maintained while a Task not done:
136 #
137 # - Either _fut_waiter is None, and _step() is scheduled;
138 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
139 #
140 # The only transition from the latter to the former is through
141 # _wakeup(). When _fut_waiter is not None, one of its callbacks
142 # must be _wakeup().
143
144 # Weak set containing all tasks alive.
145 _all_tasks = weakref.WeakSet()
146
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800147 # Dictionary containing tasks that are currently active in
148 # all running event loops. {EventLoop: Task}
149 _current_tasks = {}
150
151 @classmethod
152 def current_task(cls, loop=None):
153 """Return the currently running task in an event loop or None.
154
155 By default the current task for the current event loop is returned.
156
157 None is returned when called not in the context of a Task.
158 """
159 if loop is None:
160 loop = events.get_event_loop()
161 return cls._current_tasks.get(loop)
162
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700163 @classmethod
164 def all_tasks(cls, loop=None):
165 """Return a set of all tasks for an event loop.
166
167 By default all tasks for the current event loop are returned.
168 """
169 if loop is None:
170 loop = events.get_event_loop()
171 return {t for t in cls._all_tasks if t._loop is loop}
172
173 def __init__(self, coro, *, loop=None):
174 assert iscoroutine(coro), repr(coro) # Not a coroutine function!
175 super().__init__(loop=loop)
176 self._coro = iter(coro) # Use the iterator just in case.
177 self._fut_waiter = None
178 self._must_cancel = False
179 self._loop.call_soon(self._step)
180 self.__class__._all_tasks.add(self)
181
182 def __repr__(self):
183 res = super().__repr__()
184 if (self._must_cancel and
185 self._state == futures._PENDING and
186 '<PENDING' in res):
187 res = res.replace('<PENDING', '<CANCELLING', 1)
188 i = res.find('<')
189 if i < 0:
190 i = len(res)
191 res = res[:i] + '(<{}>)'.format(self._coro.__name__) + res[i:]
192 return res
193
194 def get_stack(self, *, limit=None):
195 """Return the list of stack frames for this task's coroutine.
196
197 If the coroutine is active, this returns the stack where it is
198 suspended. If the coroutine has completed successfully or was
199 cancelled, this returns an empty list. If the coroutine was
200 terminated by an exception, this returns the list of traceback
201 frames.
202
203 The frames are always ordered from oldest to newest.
204
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500205 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700206 return; by default all available frames are returned. Its
207 meaning differs depending on whether a stack or a traceback is
208 returned: the newest frames of a stack are returned, but the
209 oldest frames of a traceback are returned. (This matches the
210 behavior of the traceback module.)
211
212 For reasons beyond our control, only one stack frame is
213 returned for a suspended coroutine.
214 """
215 frames = []
216 f = self._coro.gi_frame
217 if f is not None:
218 while f is not None:
219 if limit is not None:
220 if limit <= 0:
221 break
222 limit -= 1
223 frames.append(f)
224 f = f.f_back
225 frames.reverse()
226 elif self._exception is not None:
227 tb = self._exception.__traceback__
228 while tb is not None:
229 if limit is not None:
230 if limit <= 0:
231 break
232 limit -= 1
233 frames.append(tb.tb_frame)
234 tb = tb.tb_next
235 return frames
236
237 def print_stack(self, *, limit=None, file=None):
238 """Print the stack or traceback for this task's coroutine.
239
240 This produces output similar to that of the traceback module,
241 for the frames retrieved by get_stack(). The limit argument
242 is passed to get_stack(). The file argument is an I/O stream
243 to which the output goes; by default it goes to sys.stderr.
244 """
245 extracted_list = []
246 checked = set()
247 for f in self.get_stack(limit=limit):
248 lineno = f.f_lineno
249 co = f.f_code
250 filename = co.co_filename
251 name = co.co_name
252 if filename not in checked:
253 checked.add(filename)
254 linecache.checkcache(filename)
255 line = linecache.getline(filename, lineno, f.f_globals)
256 extracted_list.append((filename, lineno, name, line))
257 exc = self._exception
258 if not extracted_list:
259 print('No stack for %r' % self, file=file)
260 elif exc is not None:
261 print('Traceback for %r (most recent call last):' % self,
262 file=file)
263 else:
264 print('Stack for %r (most recent call last):' % self,
265 file=file)
266 traceback.print_list(extracted_list, file=file)
267 if exc is not None:
268 for line in traceback.format_exception_only(exc.__class__, exc):
269 print(line, file=file, end='')
270
271 def cancel(self):
Victor Stinner4bd652a2014-04-07 11:18:06 +0200272 """Request that a task to cancel itself.
273
274 This arranges for a CancellationError to be thrown into the
275 wrapped coroutine on the next cycle through the event loop.
276 The coroutine then has a chance to clean up or even deny
277 the request using try/except/finally.
278
279 Contrary to Future.cancel(), this does not guarantee that the
280 task will be cancelled: the exception might be caught and
281 acted upon, delaying cancellation of the task or preventing it
282 completely. The task may also return a value or raise a
283 different exception.
284
285 Immediately after this method is called, Task.cancelled() will
286 not return True (unless the task was already cancelled). A
287 task will be marked as cancelled when the wrapped coroutine
288 terminates with a CancelledError exception (even if cancel()
289 was not called).
290 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700291 if self.done():
292 return False
293 if self._fut_waiter is not None:
294 if self._fut_waiter.cancel():
295 # Leave self._fut_waiter; it may be a Task that
296 # catches and ignores the cancellation so we may have
297 # to cancel it again later.
298 return True
299 # It must be the case that self._step is already scheduled.
300 self._must_cancel = True
301 return True
302
303 def _step(self, value=None, exc=None):
304 assert not self.done(), \
305 '_step(): already done: {!r}, {!r}, {!r}'.format(self, value, exc)
306 if self._must_cancel:
307 if not isinstance(exc, futures.CancelledError):
308 exc = futures.CancelledError()
309 self._must_cancel = False
310 coro = self._coro
311 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800312
313 self.__class__._current_tasks[self._loop] = self
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700314 # Call either coro.throw(exc) or coro.send(value).
315 try:
316 if exc is not None:
317 result = coro.throw(exc)
318 elif value is not None:
319 result = coro.send(value)
320 else:
321 result = next(coro)
322 except StopIteration as exc:
323 self.set_result(exc.value)
324 except futures.CancelledError as exc:
325 super().cancel() # I.e., Future.cancel(self).
326 except Exception as exc:
327 self.set_exception(exc)
328 except BaseException as exc:
329 self.set_exception(exc)
330 raise
331 else:
332 if isinstance(result, futures.Future):
333 # Yielded Future must come from Future.__iter__().
334 if result._blocking:
335 result._blocking = False
336 result.add_done_callback(self._wakeup)
337 self._fut_waiter = result
338 if self._must_cancel:
339 if self._fut_waiter.cancel():
340 self._must_cancel = False
341 else:
342 self._loop.call_soon(
343 self._step, None,
344 RuntimeError(
345 'yield was used instead of yield from '
346 'in task {!r} with {!r}'.format(self, result)))
347 elif result is None:
348 # Bare yield relinquishes control for one event loop iteration.
349 self._loop.call_soon(self._step)
350 elif inspect.isgenerator(result):
351 # Yielding a generator is just wrong.
352 self._loop.call_soon(
353 self._step, None,
354 RuntimeError(
355 'yield was used instead of yield from for '
356 'generator in task {!r} with {}'.format(
357 self, result)))
358 else:
359 # Yielding something else is an error.
360 self._loop.call_soon(
361 self._step, None,
362 RuntimeError(
363 'Task got bad yield: {!r}'.format(result)))
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800364 finally:
365 self.__class__._current_tasks.pop(self._loop)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100366 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700367
368 def _wakeup(self, future):
369 try:
370 value = future.result()
371 except Exception as exc:
372 # This may also be a cancellation.
373 self._step(None, exc)
374 else:
375 self._step(value, None)
376 self = None # Needed to break cycles when an exception occurs.
377
378
379# wait() and as_completed() similar to those in PEP 3148.
380
381FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
382FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
383ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
384
385
386@coroutine
387def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
388 """Wait for the Futures and coroutines given by fs to complete.
389
390 Coroutines will be wrapped in Tasks.
391
392 Returns two sets of Future: (done, pending).
393
394 Usage:
395
396 done, pending = yield from asyncio.wait(fs)
397
398 Note: This does not raise TimeoutError! Futures that aren't done
399 when the timeout occurs are returned in the second set.
400 """
Victor Stinnereb748762014-02-11 11:54:08 +0100401 if isinstance(fs, futures.Future) or iscoroutine(fs):
402 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700403 if not fs:
404 raise ValueError('Set of coroutines/Futures is empty.')
405
406 if loop is None:
407 loop = events.get_event_loop()
408
Yury Selivanov622be342014-02-06 22:06:16 -0500409 fs = {async(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700410
411 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
412 raise ValueError('Invalid return_when value: {}'.format(return_when))
413 return (yield from _wait(fs, timeout, return_when, loop))
414
415
416def _release_waiter(waiter, value=True, *args):
417 if not waiter.done():
418 waiter.set_result(value)
419
420
421@coroutine
422def wait_for(fut, timeout, *, loop=None):
423 """Wait for the single Future or coroutine to complete, with timeout.
424
425 Coroutine will be wrapped in Task.
426
Victor Stinner421e49b2014-01-23 17:40:59 +0100427 Returns result of the Future or coroutine. When a timeout occurs,
428 it cancels the task and raises TimeoutError. To avoid the task
429 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700430
431 Usage:
432
433 result = yield from asyncio.wait_for(fut, 10.0)
434
435 """
436 if loop is None:
437 loop = events.get_event_loop()
438
Guido van Rossum48c66c32014-01-29 14:30:38 -0800439 if timeout is None:
440 return (yield from fut)
441
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700442 waiter = futures.Future(loop=loop)
443 timeout_handle = loop.call_later(timeout, _release_waiter, waiter, False)
444 cb = functools.partial(_release_waiter, waiter, True)
445
446 fut = async(fut, loop=loop)
447 fut.add_done_callback(cb)
448
449 try:
450 if (yield from waiter):
451 return fut.result()
452 else:
453 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100454 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700455 raise futures.TimeoutError()
456 finally:
457 timeout_handle.cancel()
458
459
460@coroutine
461def _wait(fs, timeout, return_when, loop):
462 """Internal helper for wait() and _wait_for().
463
464 The fs argument must be a collection of Futures.
465 """
466 assert fs, 'Set of Futures is empty.'
467 waiter = futures.Future(loop=loop)
468 timeout_handle = None
469 if timeout is not None:
470 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
471 counter = len(fs)
472
473 def _on_completion(f):
474 nonlocal counter
475 counter -= 1
476 if (counter <= 0 or
477 return_when == FIRST_COMPLETED or
478 return_when == FIRST_EXCEPTION and (not f.cancelled() and
479 f.exception() is not None)):
480 if timeout_handle is not None:
481 timeout_handle.cancel()
482 if not waiter.done():
483 waiter.set_result(False)
484
485 for f in fs:
486 f.add_done_callback(_on_completion)
487
488 try:
489 yield from waiter
490 finally:
491 if timeout_handle is not None:
492 timeout_handle.cancel()
493
494 done, pending = set(), set()
495 for f in fs:
496 f.remove_done_callback(_on_completion)
497 if f.done():
498 done.add(f)
499 else:
500 pending.add(f)
501 return done, pending
502
503
504# This is *not* a @coroutine! It is just an iterator (yielding Futures).
505def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800506 """Return an iterator whose values are coroutines.
507
508 When waiting for the yielded coroutines you'll get the results (or
509 exceptions!) of the original Futures (or coroutines), in the order
510 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700511
512 This differs from PEP 3148; the proper way to use this is:
513
514 for f in as_completed(fs):
515 result = yield from f # The 'yield from' may raise.
516 # Use result.
517
Guido van Rossumb58f0532014-02-12 17:58:19 -0800518 If a timeout is specified, the 'yield from' will raise
519 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700520
521 Note: The futures 'f' are not necessarily members of fs.
522 """
Victor Stinnereb748762014-02-11 11:54:08 +0100523 if isinstance(fs, futures.Future) or iscoroutine(fs):
524 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700525 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov622be342014-02-06 22:06:16 -0500526 todo = {async(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800527 from .queues import Queue # Import here to avoid circular import problem.
528 done = Queue(loop=loop)
529 timeout_handle = None
530
531 def _on_timeout():
532 for f in todo:
533 f.remove_done_callback(_on_completion)
534 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
535 todo.clear() # Can't do todo.remove(f) in the loop.
536
537 def _on_completion(f):
538 if not todo:
539 return # _on_timeout() was here first.
540 todo.remove(f)
541 done.put_nowait(f)
542 if not todo and timeout_handle is not None:
543 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700544
545 @coroutine
546 def _wait_for_one():
Guido van Rossumb58f0532014-02-12 17:58:19 -0800547 f = yield from done.get()
548 if f is None:
549 # Dummy value from _on_timeout().
550 raise futures.TimeoutError
551 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700552
Guido van Rossumb58f0532014-02-12 17:58:19 -0800553 for f in todo:
554 f.add_done_callback(_on_completion)
555 if todo and timeout is not None:
556 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700557 for _ in range(len(todo)):
558 yield _wait_for_one()
559
560
561@coroutine
562def sleep(delay, result=None, *, loop=None):
563 """Coroutine that completes after a given time (in seconds)."""
564 future = futures.Future(loop=loop)
565 h = future._loop.call_later(delay, future.set_result, result)
566 try:
567 return (yield from future)
568 finally:
569 h.cancel()
570
571
572def async(coro_or_future, *, loop=None):
573 """Wrap a coroutine in a future.
574
575 If the argument is a Future, it is returned directly.
576 """
577 if isinstance(coro_or_future, futures.Future):
578 if loop is not None and loop is not coro_or_future._loop:
579 raise ValueError('loop argument must agree with Future')
580 return coro_or_future
581 elif iscoroutine(coro_or_future):
582 return Task(coro_or_future, loop=loop)
583 else:
584 raise TypeError('A Future or coroutine is required')
585
586
587class _GatheringFuture(futures.Future):
588 """Helper for gather().
589
590 This overrides cancel() to cancel all the children and act more
591 like Task.cancel(), which doesn't immediately mark itself as
592 cancelled.
593 """
594
595 def __init__(self, children, *, loop=None):
596 super().__init__(loop=loop)
597 self._children = children
598
599 def cancel(self):
600 if self.done():
601 return False
602 for child in self._children:
603 child.cancel()
604 return True
605
606
607def gather(*coros_or_futures, loop=None, return_exceptions=False):
608 """Return a future aggregating results from the given coroutines
609 or futures.
610
611 All futures must share the same event loop. If all the tasks are
612 done successfully, the returned future's result is the list of
613 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500614 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700615 exceptions in the tasks are treated the same as successful
616 results, and gathered in the result list; otherwise, the first
617 raised exception will be immediately propagated to the returned
618 future.
619
620 Cancellation: if the outer Future is cancelled, all children (that
621 have not completed yet) are also cancelled. If any child is
622 cancelled, this is treated as if it raised CancelledError --
623 the outer Future is *not* cancelled in this case. (This is to
624 prevent the cancellation of one child to cause other children to
625 be cancelled.)
626 """
Yury Selivanov622be342014-02-06 22:06:16 -0500627 arg_to_fut = {arg: async(arg, loop=loop) for arg in set(coros_or_futures)}
628 children = [arg_to_fut[arg] for arg in coros_or_futures]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700629 n = len(children)
630 if n == 0:
631 outer = futures.Future(loop=loop)
632 outer.set_result([])
633 return outer
634 if loop is None:
635 loop = children[0]._loop
636 for fut in children:
637 if fut._loop is not loop:
638 raise ValueError("futures are tied to different event loops")
639 outer = _GatheringFuture(children, loop=loop)
640 nfinished = 0
641 results = [None] * n
642
643 def _done_callback(i, fut):
644 nonlocal nfinished
645 if outer._state != futures._PENDING:
646 if fut._exception is not None:
647 # Mark exception retrieved.
648 fut.exception()
649 return
650 if fut._state == futures._CANCELLED:
651 res = futures.CancelledError()
652 if not return_exceptions:
653 outer.set_exception(res)
654 return
655 elif fut._exception is not None:
656 res = fut.exception() # Mark exception retrieved.
657 if not return_exceptions:
658 outer.set_exception(res)
659 return
660 else:
661 res = fut._result
662 results[i] = res
663 nfinished += 1
664 if nfinished == n:
665 outer.set_result(results)
666
667 for i, fut in enumerate(children):
668 fut.add_done_callback(functools.partial(_done_callback, i))
669 return outer
670
671
672def shield(arg, *, loop=None):
673 """Wait for a future, shielding it from cancellation.
674
675 The statement
676
677 res = yield from shield(something())
678
679 is exactly equivalent to the statement
680
681 res = yield from something()
682
683 *except* that if the coroutine containing it is cancelled, the
684 task running in something() is not cancelled. From the POV of
685 something(), the cancellation did not happen. But its caller is
686 still cancelled, so the yield-from expression still raises
687 CancelledError. Note: If something() is cancelled by other means
688 this will still cancel shield().
689
690 If you want to completely ignore cancellation (not recommended)
691 you can combine shield() with a try/except clause, as follows:
692
693 try:
694 res = yield from shield(something())
695 except CancelledError:
696 res = None
697 """
698 inner = async(arg, loop=loop)
699 if inner.done():
700 # Shortcut.
701 return inner
702 loop = inner._loop
703 outer = futures.Future(loop=loop)
704
705 def _done_callback(inner):
706 if outer.cancelled():
707 # Mark inner's result as retrieved.
708 inner.cancelled() or inner.exception()
709 return
710 if inner.cancelled():
711 outer.cancel()
712 else:
713 exc = inner.exception()
714 if exc is not None:
715 outer.set_exception(exc)
716 else:
717 outer.set_result(inner.result())
718
719 inner.add_done_callback(_done_callback)
720 return outer