blob: 0785e1071839f9a6fd7b5528541a76909622401f [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
3__all__ = ['coroutine', 'Task',
Guido van Rossum68816ef2013-12-28 08:06:40 -10004 'iscoroutinefunction', 'iscoroutine',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
6 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
Guido van Rossumde3a1362013-11-29 09:29:00 -08007 'gather', 'shield',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008 ]
9
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070010import concurrent.futures
11import functools
12import inspect
13import linecache
Victor Stinner7ef60cd2014-02-19 23:15:02 +010014import os
15import sys
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016import traceback
17import weakref
18
19from . import events
20from . import futures
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070021from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
23# If you set _DEBUG to true, @coroutine will wrap the resulting
24# generator objects in a CoroWrapper instance (defined below). That
25# instance will log a message when the generator is never iterated
26# over, which may happen when you forget to use "yield from" with a
27# coroutine call. Note that the value of the _DEBUG flag is taken
28# when the decorator is used, so to be of any use it must be set
29# before you define your coroutines. A downside of using this feature
30# is that tracebacks show entries for the CoroWrapper.__next__ method
31# when _DEBUG is true.
Victor Stinner7ef60cd2014-02-19 23:15:02 +010032_DEBUG = (not sys.flags.ignore_environment
33 and bool(os.environ.get('PYTHONASYNCIODEBUG')))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070034
35
36class CoroWrapper:
Guido van Rossume1f55442014-01-16 11:05:23 -080037 # Wrapper for coroutine in _DEBUG mode.
38
Victor Stinnerbac77932014-01-16 01:55:29 +010039 __slots__ = ['gen', 'func', '__name__', '__doc__']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070040
41 def __init__(self, gen, func):
42 assert inspect.isgenerator(gen), gen
43 self.gen = gen
44 self.func = func
45
46 def __iter__(self):
47 return self
48
49 def __next__(self):
50 return next(self.gen)
51
Yury Selivanovf15f7482014-04-14 22:21:52 -040052 def send(self, *value):
53 # We use `*value` because of a bug in CPythons prior
54 # to 3.4.1. See issue #21209 and test_yield_from_corowrapper
55 # for details. This workaround should be removed in 3.5.0.
Yury Selivanov09cc1692014-04-15 12:01:16 -040056 if len(value) == 1:
57 value = value[0]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070058 return self.gen.send(value)
59
60 def throw(self, exc):
61 return self.gen.throw(exc)
62
63 def close(self):
64 return self.gen.close()
65
66 def __del__(self):
67 frame = self.gen.gi_frame
68 if frame is not None and frame.f_lasti == -1:
69 func = self.func
70 code = func.__code__
71 filename = code.co_filename
72 lineno = code.co_firstlineno
Guido van Rossum2b430b82013-11-01 14:13:30 -070073 logger.error(
74 'Coroutine %r defined at %s:%s was never yielded from',
75 func.__name__, filename, lineno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070076
77
78def coroutine(func):
79 """Decorator to mark coroutines.
80
81 If the coroutine is not yielded from before it is destroyed,
82 an error message is logged.
83 """
84 if inspect.isgeneratorfunction(func):
85 coro = func
86 else:
87 @functools.wraps(func)
88 def coro(*args, **kw):
89 res = func(*args, **kw)
90 if isinstance(res, futures.Future) or inspect.isgenerator(res):
91 res = yield from res
92 return res
93
94 if not _DEBUG:
95 wrapper = coro
96 else:
97 @functools.wraps(func)
98 def wrapper(*args, **kwds):
99 w = CoroWrapper(coro(*args, **kwds), func)
100 w.__name__ = coro.__name__
101 w.__doc__ = coro.__doc__
102 return w
103
104 wrapper._is_coroutine = True # For iscoroutinefunction().
105 return wrapper
106
107
108def iscoroutinefunction(func):
109 """Return True if func is a decorated coroutine function."""
110 return getattr(func, '_is_coroutine', False)
111
112
113def iscoroutine(obj):
114 """Return True if obj is a coroutine object."""
115 return isinstance(obj, CoroWrapper) or inspect.isgenerator(obj)
116
117
118class Task(futures.Future):
119 """A coroutine wrapped in a Future."""
120
121 # An important invariant maintained while a Task not done:
122 #
123 # - Either _fut_waiter is None, and _step() is scheduled;
124 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
125 #
126 # The only transition from the latter to the former is through
127 # _wakeup(). When _fut_waiter is not None, one of its callbacks
128 # must be _wakeup().
129
130 # Weak set containing all tasks alive.
131 _all_tasks = weakref.WeakSet()
132
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800133 # Dictionary containing tasks that are currently active in
134 # all running event loops. {EventLoop: Task}
135 _current_tasks = {}
136
137 @classmethod
138 def current_task(cls, loop=None):
139 """Return the currently running task in an event loop or None.
140
141 By default the current task for the current event loop is returned.
142
143 None is returned when called not in the context of a Task.
144 """
145 if loop is None:
146 loop = events.get_event_loop()
147 return cls._current_tasks.get(loop)
148
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700149 @classmethod
150 def all_tasks(cls, loop=None):
151 """Return a set of all tasks for an event loop.
152
153 By default all tasks for the current event loop are returned.
154 """
155 if loop is None:
156 loop = events.get_event_loop()
157 return {t for t in cls._all_tasks if t._loop is loop}
158
159 def __init__(self, coro, *, loop=None):
160 assert iscoroutine(coro), repr(coro) # Not a coroutine function!
161 super().__init__(loop=loop)
162 self._coro = iter(coro) # Use the iterator just in case.
163 self._fut_waiter = None
164 self._must_cancel = False
165 self._loop.call_soon(self._step)
166 self.__class__._all_tasks.add(self)
167
168 def __repr__(self):
169 res = super().__repr__()
170 if (self._must_cancel and
171 self._state == futures._PENDING and
172 '<PENDING' in res):
173 res = res.replace('<PENDING', '<CANCELLING', 1)
174 i = res.find('<')
175 if i < 0:
176 i = len(res)
177 res = res[:i] + '(<{}>)'.format(self._coro.__name__) + res[i:]
178 return res
179
180 def get_stack(self, *, limit=None):
181 """Return the list of stack frames for this task's coroutine.
182
183 If the coroutine is active, this returns the stack where it is
184 suspended. If the coroutine has completed successfully or was
185 cancelled, this returns an empty list. If the coroutine was
186 terminated by an exception, this returns the list of traceback
187 frames.
188
189 The frames are always ordered from oldest to newest.
190
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500191 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700192 return; by default all available frames are returned. Its
193 meaning differs depending on whether a stack or a traceback is
194 returned: the newest frames of a stack are returned, but the
195 oldest frames of a traceback are returned. (This matches the
196 behavior of the traceback module.)
197
198 For reasons beyond our control, only one stack frame is
199 returned for a suspended coroutine.
200 """
201 frames = []
202 f = self._coro.gi_frame
203 if f is not None:
204 while f is not None:
205 if limit is not None:
206 if limit <= 0:
207 break
208 limit -= 1
209 frames.append(f)
210 f = f.f_back
211 frames.reverse()
212 elif self._exception is not None:
213 tb = self._exception.__traceback__
214 while tb is not None:
215 if limit is not None:
216 if limit <= 0:
217 break
218 limit -= 1
219 frames.append(tb.tb_frame)
220 tb = tb.tb_next
221 return frames
222
223 def print_stack(self, *, limit=None, file=None):
224 """Print the stack or traceback for this task's coroutine.
225
226 This produces output similar to that of the traceback module,
227 for the frames retrieved by get_stack(). The limit argument
228 is passed to get_stack(). The file argument is an I/O stream
229 to which the output goes; by default it goes to sys.stderr.
230 """
231 extracted_list = []
232 checked = set()
233 for f in self.get_stack(limit=limit):
234 lineno = f.f_lineno
235 co = f.f_code
236 filename = co.co_filename
237 name = co.co_name
238 if filename not in checked:
239 checked.add(filename)
240 linecache.checkcache(filename)
241 line = linecache.getline(filename, lineno, f.f_globals)
242 extracted_list.append((filename, lineno, name, line))
243 exc = self._exception
244 if not extracted_list:
245 print('No stack for %r' % self, file=file)
246 elif exc is not None:
247 print('Traceback for %r (most recent call last):' % self,
248 file=file)
249 else:
250 print('Stack for %r (most recent call last):' % self,
251 file=file)
252 traceback.print_list(extracted_list, file=file)
253 if exc is not None:
254 for line in traceback.format_exception_only(exc.__class__, exc):
255 print(line, file=file, end='')
256
257 def cancel(self):
Victor Stinner4bd652a2014-04-07 11:18:06 +0200258 """Request that a task to cancel itself.
259
260 This arranges for a CancellationError to be thrown into the
261 wrapped coroutine on the next cycle through the event loop.
262 The coroutine then has a chance to clean up or even deny
263 the request using try/except/finally.
264
265 Contrary to Future.cancel(), this does not guarantee that the
266 task will be cancelled: the exception might be caught and
267 acted upon, delaying cancellation of the task or preventing it
268 completely. The task may also return a value or raise a
269 different exception.
270
271 Immediately after this method is called, Task.cancelled() will
272 not return True (unless the task was already cancelled). A
273 task will be marked as cancelled when the wrapped coroutine
274 terminates with a CancelledError exception (even if cancel()
275 was not called).
276 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700277 if self.done():
278 return False
279 if self._fut_waiter is not None:
280 if self._fut_waiter.cancel():
281 # Leave self._fut_waiter; it may be a Task that
282 # catches and ignores the cancellation so we may have
283 # to cancel it again later.
284 return True
285 # It must be the case that self._step is already scheduled.
286 self._must_cancel = True
287 return True
288
289 def _step(self, value=None, exc=None):
290 assert not self.done(), \
291 '_step(): already done: {!r}, {!r}, {!r}'.format(self, value, exc)
292 if self._must_cancel:
293 if not isinstance(exc, futures.CancelledError):
294 exc = futures.CancelledError()
295 self._must_cancel = False
296 coro = self._coro
297 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800298
299 self.__class__._current_tasks[self._loop] = self
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700300 # Call either coro.throw(exc) or coro.send(value).
301 try:
302 if exc is not None:
303 result = coro.throw(exc)
304 elif value is not None:
305 result = coro.send(value)
306 else:
307 result = next(coro)
308 except StopIteration as exc:
309 self.set_result(exc.value)
310 except futures.CancelledError as exc:
311 super().cancel() # I.e., Future.cancel(self).
312 except Exception as exc:
313 self.set_exception(exc)
314 except BaseException as exc:
315 self.set_exception(exc)
316 raise
317 else:
318 if isinstance(result, futures.Future):
319 # Yielded Future must come from Future.__iter__().
320 if result._blocking:
321 result._blocking = False
322 result.add_done_callback(self._wakeup)
323 self._fut_waiter = result
324 if self._must_cancel:
325 if self._fut_waiter.cancel():
326 self._must_cancel = False
327 else:
328 self._loop.call_soon(
329 self._step, None,
330 RuntimeError(
331 'yield was used instead of yield from '
332 'in task {!r} with {!r}'.format(self, result)))
333 elif result is None:
334 # Bare yield relinquishes control for one event loop iteration.
335 self._loop.call_soon(self._step)
336 elif inspect.isgenerator(result):
337 # Yielding a generator is just wrong.
338 self._loop.call_soon(
339 self._step, None,
340 RuntimeError(
341 'yield was used instead of yield from for '
342 'generator in task {!r} with {}'.format(
343 self, result)))
344 else:
345 # Yielding something else is an error.
346 self._loop.call_soon(
347 self._step, None,
348 RuntimeError(
349 'Task got bad yield: {!r}'.format(result)))
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800350 finally:
351 self.__class__._current_tasks.pop(self._loop)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100352 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700353
354 def _wakeup(self, future):
355 try:
356 value = future.result()
357 except Exception as exc:
358 # This may also be a cancellation.
359 self._step(None, exc)
360 else:
361 self._step(value, None)
362 self = None # Needed to break cycles when an exception occurs.
363
364
365# wait() and as_completed() similar to those in PEP 3148.
366
367FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
368FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
369ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
370
371
372@coroutine
373def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
374 """Wait for the Futures and coroutines given by fs to complete.
375
376 Coroutines will be wrapped in Tasks.
377
378 Returns two sets of Future: (done, pending).
379
380 Usage:
381
382 done, pending = yield from asyncio.wait(fs)
383
384 Note: This does not raise TimeoutError! Futures that aren't done
385 when the timeout occurs are returned in the second set.
386 """
Victor Stinnereb748762014-02-11 11:54:08 +0100387 if isinstance(fs, futures.Future) or iscoroutine(fs):
388 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700389 if not fs:
390 raise ValueError('Set of coroutines/Futures is empty.')
391
392 if loop is None:
393 loop = events.get_event_loop()
394
Yury Selivanov622be342014-02-06 22:06:16 -0500395 fs = {async(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700396
397 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
398 raise ValueError('Invalid return_when value: {}'.format(return_when))
399 return (yield from _wait(fs, timeout, return_when, loop))
400
401
402def _release_waiter(waiter, value=True, *args):
403 if not waiter.done():
404 waiter.set_result(value)
405
406
407@coroutine
408def wait_for(fut, timeout, *, loop=None):
409 """Wait for the single Future or coroutine to complete, with timeout.
410
411 Coroutine will be wrapped in Task.
412
Victor Stinner421e49b2014-01-23 17:40:59 +0100413 Returns result of the Future or coroutine. When a timeout occurs,
414 it cancels the task and raises TimeoutError. To avoid the task
415 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700416
417 Usage:
418
419 result = yield from asyncio.wait_for(fut, 10.0)
420
421 """
422 if loop is None:
423 loop = events.get_event_loop()
424
Guido van Rossum48c66c32014-01-29 14:30:38 -0800425 if timeout is None:
426 return (yield from fut)
427
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700428 waiter = futures.Future(loop=loop)
429 timeout_handle = loop.call_later(timeout, _release_waiter, waiter, False)
430 cb = functools.partial(_release_waiter, waiter, True)
431
432 fut = async(fut, loop=loop)
433 fut.add_done_callback(cb)
434
435 try:
436 if (yield from waiter):
437 return fut.result()
438 else:
439 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100440 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700441 raise futures.TimeoutError()
442 finally:
443 timeout_handle.cancel()
444
445
446@coroutine
447def _wait(fs, timeout, return_when, loop):
448 """Internal helper for wait() and _wait_for().
449
450 The fs argument must be a collection of Futures.
451 """
452 assert fs, 'Set of Futures is empty.'
453 waiter = futures.Future(loop=loop)
454 timeout_handle = None
455 if timeout is not None:
456 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
457 counter = len(fs)
458
459 def _on_completion(f):
460 nonlocal counter
461 counter -= 1
462 if (counter <= 0 or
463 return_when == FIRST_COMPLETED or
464 return_when == FIRST_EXCEPTION and (not f.cancelled() and
465 f.exception() is not None)):
466 if timeout_handle is not None:
467 timeout_handle.cancel()
468 if not waiter.done():
469 waiter.set_result(False)
470
471 for f in fs:
472 f.add_done_callback(_on_completion)
473
474 try:
475 yield from waiter
476 finally:
477 if timeout_handle is not None:
478 timeout_handle.cancel()
479
480 done, pending = set(), set()
481 for f in fs:
482 f.remove_done_callback(_on_completion)
483 if f.done():
484 done.add(f)
485 else:
486 pending.add(f)
487 return done, pending
488
489
490# This is *not* a @coroutine! It is just an iterator (yielding Futures).
491def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800492 """Return an iterator whose values are coroutines.
493
494 When waiting for the yielded coroutines you'll get the results (or
495 exceptions!) of the original Futures (or coroutines), in the order
496 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700497
498 This differs from PEP 3148; the proper way to use this is:
499
500 for f in as_completed(fs):
501 result = yield from f # The 'yield from' may raise.
502 # Use result.
503
Guido van Rossumb58f0532014-02-12 17:58:19 -0800504 If a timeout is specified, the 'yield from' will raise
505 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700506
507 Note: The futures 'f' are not necessarily members of fs.
508 """
Victor Stinnereb748762014-02-11 11:54:08 +0100509 if isinstance(fs, futures.Future) or iscoroutine(fs):
510 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700511 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov622be342014-02-06 22:06:16 -0500512 todo = {async(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800513 from .queues import Queue # Import here to avoid circular import problem.
514 done = Queue(loop=loop)
515 timeout_handle = None
516
517 def _on_timeout():
518 for f in todo:
519 f.remove_done_callback(_on_completion)
520 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
521 todo.clear() # Can't do todo.remove(f) in the loop.
522
523 def _on_completion(f):
524 if not todo:
525 return # _on_timeout() was here first.
526 todo.remove(f)
527 done.put_nowait(f)
528 if not todo and timeout_handle is not None:
529 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700530
531 @coroutine
532 def _wait_for_one():
Guido van Rossumb58f0532014-02-12 17:58:19 -0800533 f = yield from done.get()
534 if f is None:
535 # Dummy value from _on_timeout().
536 raise futures.TimeoutError
537 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700538
Guido van Rossumb58f0532014-02-12 17:58:19 -0800539 for f in todo:
540 f.add_done_callback(_on_completion)
541 if todo and timeout is not None:
542 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700543 for _ in range(len(todo)):
544 yield _wait_for_one()
545
546
547@coroutine
548def sleep(delay, result=None, *, loop=None):
549 """Coroutine that completes after a given time (in seconds)."""
550 future = futures.Future(loop=loop)
551 h = future._loop.call_later(delay, future.set_result, result)
552 try:
553 return (yield from future)
554 finally:
555 h.cancel()
556
557
558def async(coro_or_future, *, loop=None):
559 """Wrap a coroutine in a future.
560
561 If the argument is a Future, it is returned directly.
562 """
563 if isinstance(coro_or_future, futures.Future):
564 if loop is not None and loop is not coro_or_future._loop:
565 raise ValueError('loop argument must agree with Future')
566 return coro_or_future
567 elif iscoroutine(coro_or_future):
568 return Task(coro_or_future, loop=loop)
569 else:
570 raise TypeError('A Future or coroutine is required')
571
572
573class _GatheringFuture(futures.Future):
574 """Helper for gather().
575
576 This overrides cancel() to cancel all the children and act more
577 like Task.cancel(), which doesn't immediately mark itself as
578 cancelled.
579 """
580
581 def __init__(self, children, *, loop=None):
582 super().__init__(loop=loop)
583 self._children = children
584
585 def cancel(self):
586 if self.done():
587 return False
588 for child in self._children:
589 child.cancel()
590 return True
591
592
593def gather(*coros_or_futures, loop=None, return_exceptions=False):
594 """Return a future aggregating results from the given coroutines
595 or futures.
596
597 All futures must share the same event loop. If all the tasks are
598 done successfully, the returned future's result is the list of
599 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500600 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700601 exceptions in the tasks are treated the same as successful
602 results, and gathered in the result list; otherwise, the first
603 raised exception will be immediately propagated to the returned
604 future.
605
606 Cancellation: if the outer Future is cancelled, all children (that
607 have not completed yet) are also cancelled. If any child is
608 cancelled, this is treated as if it raised CancelledError --
609 the outer Future is *not* cancelled in this case. (This is to
610 prevent the cancellation of one child to cause other children to
611 be cancelled.)
612 """
Yury Selivanov622be342014-02-06 22:06:16 -0500613 arg_to_fut = {arg: async(arg, loop=loop) for arg in set(coros_or_futures)}
614 children = [arg_to_fut[arg] for arg in coros_or_futures]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700615 n = len(children)
616 if n == 0:
617 outer = futures.Future(loop=loop)
618 outer.set_result([])
619 return outer
620 if loop is None:
621 loop = children[0]._loop
622 for fut in children:
623 if fut._loop is not loop:
624 raise ValueError("futures are tied to different event loops")
625 outer = _GatheringFuture(children, loop=loop)
626 nfinished = 0
627 results = [None] * n
628
629 def _done_callback(i, fut):
630 nonlocal nfinished
631 if outer._state != futures._PENDING:
632 if fut._exception is not None:
633 # Mark exception retrieved.
634 fut.exception()
635 return
636 if fut._state == futures._CANCELLED:
637 res = futures.CancelledError()
638 if not return_exceptions:
639 outer.set_exception(res)
640 return
641 elif fut._exception is not None:
642 res = fut.exception() # Mark exception retrieved.
643 if not return_exceptions:
644 outer.set_exception(res)
645 return
646 else:
647 res = fut._result
648 results[i] = res
649 nfinished += 1
650 if nfinished == n:
651 outer.set_result(results)
652
653 for i, fut in enumerate(children):
654 fut.add_done_callback(functools.partial(_done_callback, i))
655 return outer
656
657
658def shield(arg, *, loop=None):
659 """Wait for a future, shielding it from cancellation.
660
661 The statement
662
663 res = yield from shield(something())
664
665 is exactly equivalent to the statement
666
667 res = yield from something()
668
669 *except* that if the coroutine containing it is cancelled, the
670 task running in something() is not cancelled. From the POV of
671 something(), the cancellation did not happen. But its caller is
672 still cancelled, so the yield-from expression still raises
673 CancelledError. Note: If something() is cancelled by other means
674 this will still cancel shield().
675
676 If you want to completely ignore cancellation (not recommended)
677 you can combine shield() with a try/except clause, as follows:
678
679 try:
680 res = yield from shield(something())
681 except CancelledError:
682 res = None
683 """
684 inner = async(arg, loop=loop)
685 if inner.done():
686 # Shortcut.
687 return inner
688 loop = inner._loop
689 outer = futures.Future(loop=loop)
690
691 def _done_callback(inner):
692 if outer.cancelled():
693 # Mark inner's result as retrieved.
694 inner.cancelled() or inner.exception()
695 return
696 if inner.cancelled():
697 outer.cancel()
698 else:
699 exc = inner.exception()
700 if exc is not None:
701 outer.set_exception(exc)
702 else:
703 outer.set_result(inner.result())
704
705 inner.add_done_callback(_done_callback)
706 return outer