blob: 0366da35fee140314ff1ec757d4fd7beac1d43b6 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
3__all__ = ['coroutine', 'Task',
Guido van Rossum68816ef2013-12-28 08:06:40 -10004 'iscoroutinefunction', 'iscoroutine',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
6 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
Guido van Rossumde3a1362013-11-29 09:29:00 -08007 'gather', 'shield',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008 ]
9
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070010import concurrent.futures
11import functools
12import inspect
13import linecache
Victor Stinner7ef60cd2014-02-19 23:15:02 +010014import os
15import sys
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016import traceback
17import weakref
18
19from . import events
20from . import futures
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070021from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
23# If you set _DEBUG to true, @coroutine will wrap the resulting
24# generator objects in a CoroWrapper instance (defined below). That
25# instance will log a message when the generator is never iterated
26# over, which may happen when you forget to use "yield from" with a
27# coroutine call. Note that the value of the _DEBUG flag is taken
28# when the decorator is used, so to be of any use it must be set
29# before you define your coroutines. A downside of using this feature
30# is that tracebacks show entries for the CoroWrapper.__next__ method
31# when _DEBUG is true.
Victor Stinner7ef60cd2014-02-19 23:15:02 +010032_DEBUG = (not sys.flags.ignore_environment
33 and bool(os.environ.get('PYTHONASYNCIODEBUG')))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070034
35
36class CoroWrapper:
Guido van Rossume1f55442014-01-16 11:05:23 -080037 # Wrapper for coroutine in _DEBUG mode.
38
Victor Stinnerbac77932014-01-16 01:55:29 +010039 __slots__ = ['gen', 'func', '__name__', '__doc__']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070040
41 def __init__(self, gen, func):
42 assert inspect.isgenerator(gen), gen
43 self.gen = gen
44 self.func = func
45
46 def __iter__(self):
47 return self
48
49 def __next__(self):
50 return next(self.gen)
51
Yury Selivanovf15f7482014-04-14 22:21:52 -040052 def send(self, *value):
53 # We use `*value` because of a bug in CPythons prior
54 # to 3.4.1. See issue #21209 and test_yield_from_corowrapper
55 # for details. This workaround should be removed in 3.5.0.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070056 return self.gen.send(value)
57
58 def throw(self, exc):
59 return self.gen.throw(exc)
60
61 def close(self):
62 return self.gen.close()
63
64 def __del__(self):
65 frame = self.gen.gi_frame
66 if frame is not None and frame.f_lasti == -1:
67 func = self.func
68 code = func.__code__
69 filename = code.co_filename
70 lineno = code.co_firstlineno
Guido van Rossum2b430b82013-11-01 14:13:30 -070071 logger.error(
72 'Coroutine %r defined at %s:%s was never yielded from',
73 func.__name__, filename, lineno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074
75
76def coroutine(func):
77 """Decorator to mark coroutines.
78
79 If the coroutine is not yielded from before it is destroyed,
80 an error message is logged.
81 """
82 if inspect.isgeneratorfunction(func):
83 coro = func
84 else:
85 @functools.wraps(func)
86 def coro(*args, **kw):
87 res = func(*args, **kw)
88 if isinstance(res, futures.Future) or inspect.isgenerator(res):
89 res = yield from res
90 return res
91
92 if not _DEBUG:
93 wrapper = coro
94 else:
95 @functools.wraps(func)
96 def wrapper(*args, **kwds):
97 w = CoroWrapper(coro(*args, **kwds), func)
98 w.__name__ = coro.__name__
99 w.__doc__ = coro.__doc__
100 return w
101
102 wrapper._is_coroutine = True # For iscoroutinefunction().
103 return wrapper
104
105
106def iscoroutinefunction(func):
107 """Return True if func is a decorated coroutine function."""
108 return getattr(func, '_is_coroutine', False)
109
110
111def iscoroutine(obj):
112 """Return True if obj is a coroutine object."""
113 return isinstance(obj, CoroWrapper) or inspect.isgenerator(obj)
114
115
116class Task(futures.Future):
117 """A coroutine wrapped in a Future."""
118
119 # An important invariant maintained while a Task not done:
120 #
121 # - Either _fut_waiter is None, and _step() is scheduled;
122 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
123 #
124 # The only transition from the latter to the former is through
125 # _wakeup(). When _fut_waiter is not None, one of its callbacks
126 # must be _wakeup().
127
128 # Weak set containing all tasks alive.
129 _all_tasks = weakref.WeakSet()
130
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800131 # Dictionary containing tasks that are currently active in
132 # all running event loops. {EventLoop: Task}
133 _current_tasks = {}
134
135 @classmethod
136 def current_task(cls, loop=None):
137 """Return the currently running task in an event loop or None.
138
139 By default the current task for the current event loop is returned.
140
141 None is returned when called not in the context of a Task.
142 """
143 if loop is None:
144 loop = events.get_event_loop()
145 return cls._current_tasks.get(loop)
146
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700147 @classmethod
148 def all_tasks(cls, loop=None):
149 """Return a set of all tasks for an event loop.
150
151 By default all tasks for the current event loop are returned.
152 """
153 if loop is None:
154 loop = events.get_event_loop()
155 return {t for t in cls._all_tasks if t._loop is loop}
156
157 def __init__(self, coro, *, loop=None):
158 assert iscoroutine(coro), repr(coro) # Not a coroutine function!
159 super().__init__(loop=loop)
160 self._coro = iter(coro) # Use the iterator just in case.
161 self._fut_waiter = None
162 self._must_cancel = False
163 self._loop.call_soon(self._step)
164 self.__class__._all_tasks.add(self)
165
166 def __repr__(self):
167 res = super().__repr__()
168 if (self._must_cancel and
169 self._state == futures._PENDING and
170 '<PENDING' in res):
171 res = res.replace('<PENDING', '<CANCELLING', 1)
172 i = res.find('<')
173 if i < 0:
174 i = len(res)
175 res = res[:i] + '(<{}>)'.format(self._coro.__name__) + res[i:]
176 return res
177
178 def get_stack(self, *, limit=None):
179 """Return the list of stack frames for this task's coroutine.
180
181 If the coroutine is active, this returns the stack where it is
182 suspended. If the coroutine has completed successfully or was
183 cancelled, this returns an empty list. If the coroutine was
184 terminated by an exception, this returns the list of traceback
185 frames.
186
187 The frames are always ordered from oldest to newest.
188
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500189 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700190 return; by default all available frames are returned. Its
191 meaning differs depending on whether a stack or a traceback is
192 returned: the newest frames of a stack are returned, but the
193 oldest frames of a traceback are returned. (This matches the
194 behavior of the traceback module.)
195
196 For reasons beyond our control, only one stack frame is
197 returned for a suspended coroutine.
198 """
199 frames = []
200 f = self._coro.gi_frame
201 if f is not None:
202 while f is not None:
203 if limit is not None:
204 if limit <= 0:
205 break
206 limit -= 1
207 frames.append(f)
208 f = f.f_back
209 frames.reverse()
210 elif self._exception is not None:
211 tb = self._exception.__traceback__
212 while tb is not None:
213 if limit is not None:
214 if limit <= 0:
215 break
216 limit -= 1
217 frames.append(tb.tb_frame)
218 tb = tb.tb_next
219 return frames
220
221 def print_stack(self, *, limit=None, file=None):
222 """Print the stack or traceback for this task's coroutine.
223
224 This produces output similar to that of the traceback module,
225 for the frames retrieved by get_stack(). The limit argument
226 is passed to get_stack(). The file argument is an I/O stream
227 to which the output goes; by default it goes to sys.stderr.
228 """
229 extracted_list = []
230 checked = set()
231 for f in self.get_stack(limit=limit):
232 lineno = f.f_lineno
233 co = f.f_code
234 filename = co.co_filename
235 name = co.co_name
236 if filename not in checked:
237 checked.add(filename)
238 linecache.checkcache(filename)
239 line = linecache.getline(filename, lineno, f.f_globals)
240 extracted_list.append((filename, lineno, name, line))
241 exc = self._exception
242 if not extracted_list:
243 print('No stack for %r' % self, file=file)
244 elif exc is not None:
245 print('Traceback for %r (most recent call last):' % self,
246 file=file)
247 else:
248 print('Stack for %r (most recent call last):' % self,
249 file=file)
250 traceback.print_list(extracted_list, file=file)
251 if exc is not None:
252 for line in traceback.format_exception_only(exc.__class__, exc):
253 print(line, file=file, end='')
254
255 def cancel(self):
Victor Stinner4bd652a2014-04-07 11:18:06 +0200256 """Request that a task to cancel itself.
257
258 This arranges for a CancellationError to be thrown into the
259 wrapped coroutine on the next cycle through the event loop.
260 The coroutine then has a chance to clean up or even deny
261 the request using try/except/finally.
262
263 Contrary to Future.cancel(), this does not guarantee that the
264 task will be cancelled: the exception might be caught and
265 acted upon, delaying cancellation of the task or preventing it
266 completely. The task may also return a value or raise a
267 different exception.
268
269 Immediately after this method is called, Task.cancelled() will
270 not return True (unless the task was already cancelled). A
271 task will be marked as cancelled when the wrapped coroutine
272 terminates with a CancelledError exception (even if cancel()
273 was not called).
274 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700275 if self.done():
276 return False
277 if self._fut_waiter is not None:
278 if self._fut_waiter.cancel():
279 # Leave self._fut_waiter; it may be a Task that
280 # catches and ignores the cancellation so we may have
281 # to cancel it again later.
282 return True
283 # It must be the case that self._step is already scheduled.
284 self._must_cancel = True
285 return True
286
287 def _step(self, value=None, exc=None):
288 assert not self.done(), \
289 '_step(): already done: {!r}, {!r}, {!r}'.format(self, value, exc)
290 if self._must_cancel:
291 if not isinstance(exc, futures.CancelledError):
292 exc = futures.CancelledError()
293 self._must_cancel = False
294 coro = self._coro
295 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800296
297 self.__class__._current_tasks[self._loop] = self
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700298 # Call either coro.throw(exc) or coro.send(value).
299 try:
300 if exc is not None:
301 result = coro.throw(exc)
302 elif value is not None:
303 result = coro.send(value)
304 else:
305 result = next(coro)
306 except StopIteration as exc:
307 self.set_result(exc.value)
308 except futures.CancelledError as exc:
309 super().cancel() # I.e., Future.cancel(self).
310 except Exception as exc:
311 self.set_exception(exc)
312 except BaseException as exc:
313 self.set_exception(exc)
314 raise
315 else:
316 if isinstance(result, futures.Future):
317 # Yielded Future must come from Future.__iter__().
318 if result._blocking:
319 result._blocking = False
320 result.add_done_callback(self._wakeup)
321 self._fut_waiter = result
322 if self._must_cancel:
323 if self._fut_waiter.cancel():
324 self._must_cancel = False
325 else:
326 self._loop.call_soon(
327 self._step, None,
328 RuntimeError(
329 'yield was used instead of yield from '
330 'in task {!r} with {!r}'.format(self, result)))
331 elif result is None:
332 # Bare yield relinquishes control for one event loop iteration.
333 self._loop.call_soon(self._step)
334 elif inspect.isgenerator(result):
335 # Yielding a generator is just wrong.
336 self._loop.call_soon(
337 self._step, None,
338 RuntimeError(
339 'yield was used instead of yield from for '
340 'generator in task {!r} with {}'.format(
341 self, result)))
342 else:
343 # Yielding something else is an error.
344 self._loop.call_soon(
345 self._step, None,
346 RuntimeError(
347 'Task got bad yield: {!r}'.format(result)))
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800348 finally:
349 self.__class__._current_tasks.pop(self._loop)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100350 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700351
352 def _wakeup(self, future):
353 try:
354 value = future.result()
355 except Exception as exc:
356 # This may also be a cancellation.
357 self._step(None, exc)
358 else:
359 self._step(value, None)
360 self = None # Needed to break cycles when an exception occurs.
361
362
363# wait() and as_completed() similar to those in PEP 3148.
364
365FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
366FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
367ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
368
369
370@coroutine
371def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
372 """Wait for the Futures and coroutines given by fs to complete.
373
374 Coroutines will be wrapped in Tasks.
375
376 Returns two sets of Future: (done, pending).
377
378 Usage:
379
380 done, pending = yield from asyncio.wait(fs)
381
382 Note: This does not raise TimeoutError! Futures that aren't done
383 when the timeout occurs are returned in the second set.
384 """
Victor Stinnereb748762014-02-11 11:54:08 +0100385 if isinstance(fs, futures.Future) or iscoroutine(fs):
386 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700387 if not fs:
388 raise ValueError('Set of coroutines/Futures is empty.')
389
390 if loop is None:
391 loop = events.get_event_loop()
392
Yury Selivanov622be342014-02-06 22:06:16 -0500393 fs = {async(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700394
395 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
396 raise ValueError('Invalid return_when value: {}'.format(return_when))
397 return (yield from _wait(fs, timeout, return_when, loop))
398
399
400def _release_waiter(waiter, value=True, *args):
401 if not waiter.done():
402 waiter.set_result(value)
403
404
405@coroutine
406def wait_for(fut, timeout, *, loop=None):
407 """Wait for the single Future or coroutine to complete, with timeout.
408
409 Coroutine will be wrapped in Task.
410
Victor Stinner421e49b2014-01-23 17:40:59 +0100411 Returns result of the Future or coroutine. When a timeout occurs,
412 it cancels the task and raises TimeoutError. To avoid the task
413 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700414
415 Usage:
416
417 result = yield from asyncio.wait_for(fut, 10.0)
418
419 """
420 if loop is None:
421 loop = events.get_event_loop()
422
Guido van Rossum48c66c32014-01-29 14:30:38 -0800423 if timeout is None:
424 return (yield from fut)
425
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700426 waiter = futures.Future(loop=loop)
427 timeout_handle = loop.call_later(timeout, _release_waiter, waiter, False)
428 cb = functools.partial(_release_waiter, waiter, True)
429
430 fut = async(fut, loop=loop)
431 fut.add_done_callback(cb)
432
433 try:
434 if (yield from waiter):
435 return fut.result()
436 else:
437 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100438 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700439 raise futures.TimeoutError()
440 finally:
441 timeout_handle.cancel()
442
443
444@coroutine
445def _wait(fs, timeout, return_when, loop):
446 """Internal helper for wait() and _wait_for().
447
448 The fs argument must be a collection of Futures.
449 """
450 assert fs, 'Set of Futures is empty.'
451 waiter = futures.Future(loop=loop)
452 timeout_handle = None
453 if timeout is not None:
454 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
455 counter = len(fs)
456
457 def _on_completion(f):
458 nonlocal counter
459 counter -= 1
460 if (counter <= 0 or
461 return_when == FIRST_COMPLETED or
462 return_when == FIRST_EXCEPTION and (not f.cancelled() and
463 f.exception() is not None)):
464 if timeout_handle is not None:
465 timeout_handle.cancel()
466 if not waiter.done():
467 waiter.set_result(False)
468
469 for f in fs:
470 f.add_done_callback(_on_completion)
471
472 try:
473 yield from waiter
474 finally:
475 if timeout_handle is not None:
476 timeout_handle.cancel()
477
478 done, pending = set(), set()
479 for f in fs:
480 f.remove_done_callback(_on_completion)
481 if f.done():
482 done.add(f)
483 else:
484 pending.add(f)
485 return done, pending
486
487
488# This is *not* a @coroutine! It is just an iterator (yielding Futures).
489def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800490 """Return an iterator whose values are coroutines.
491
492 When waiting for the yielded coroutines you'll get the results (or
493 exceptions!) of the original Futures (or coroutines), in the order
494 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700495
496 This differs from PEP 3148; the proper way to use this is:
497
498 for f in as_completed(fs):
499 result = yield from f # The 'yield from' may raise.
500 # Use result.
501
Guido van Rossumb58f0532014-02-12 17:58:19 -0800502 If a timeout is specified, the 'yield from' will raise
503 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700504
505 Note: The futures 'f' are not necessarily members of fs.
506 """
Victor Stinnereb748762014-02-11 11:54:08 +0100507 if isinstance(fs, futures.Future) or iscoroutine(fs):
508 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700509 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov622be342014-02-06 22:06:16 -0500510 todo = {async(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800511 from .queues import Queue # Import here to avoid circular import problem.
512 done = Queue(loop=loop)
513 timeout_handle = None
514
515 def _on_timeout():
516 for f in todo:
517 f.remove_done_callback(_on_completion)
518 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
519 todo.clear() # Can't do todo.remove(f) in the loop.
520
521 def _on_completion(f):
522 if not todo:
523 return # _on_timeout() was here first.
524 todo.remove(f)
525 done.put_nowait(f)
526 if not todo and timeout_handle is not None:
527 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700528
529 @coroutine
530 def _wait_for_one():
Guido van Rossumb58f0532014-02-12 17:58:19 -0800531 f = yield from done.get()
532 if f is None:
533 # Dummy value from _on_timeout().
534 raise futures.TimeoutError
535 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700536
Guido van Rossumb58f0532014-02-12 17:58:19 -0800537 for f in todo:
538 f.add_done_callback(_on_completion)
539 if todo and timeout is not None:
540 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700541 for _ in range(len(todo)):
542 yield _wait_for_one()
543
544
545@coroutine
546def sleep(delay, result=None, *, loop=None):
547 """Coroutine that completes after a given time (in seconds)."""
548 future = futures.Future(loop=loop)
549 h = future._loop.call_later(delay, future.set_result, result)
550 try:
551 return (yield from future)
552 finally:
553 h.cancel()
554
555
556def async(coro_or_future, *, loop=None):
557 """Wrap a coroutine in a future.
558
559 If the argument is a Future, it is returned directly.
560 """
561 if isinstance(coro_or_future, futures.Future):
562 if loop is not None and loop is not coro_or_future._loop:
563 raise ValueError('loop argument must agree with Future')
564 return coro_or_future
565 elif iscoroutine(coro_or_future):
566 return Task(coro_or_future, loop=loop)
567 else:
568 raise TypeError('A Future or coroutine is required')
569
570
571class _GatheringFuture(futures.Future):
572 """Helper for gather().
573
574 This overrides cancel() to cancel all the children and act more
575 like Task.cancel(), which doesn't immediately mark itself as
576 cancelled.
577 """
578
579 def __init__(self, children, *, loop=None):
580 super().__init__(loop=loop)
581 self._children = children
582
583 def cancel(self):
584 if self.done():
585 return False
586 for child in self._children:
587 child.cancel()
588 return True
589
590
591def gather(*coros_or_futures, loop=None, return_exceptions=False):
592 """Return a future aggregating results from the given coroutines
593 or futures.
594
595 All futures must share the same event loop. If all the tasks are
596 done successfully, the returned future's result is the list of
597 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500598 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700599 exceptions in the tasks are treated the same as successful
600 results, and gathered in the result list; otherwise, the first
601 raised exception will be immediately propagated to the returned
602 future.
603
604 Cancellation: if the outer Future is cancelled, all children (that
605 have not completed yet) are also cancelled. If any child is
606 cancelled, this is treated as if it raised CancelledError --
607 the outer Future is *not* cancelled in this case. (This is to
608 prevent the cancellation of one child to cause other children to
609 be cancelled.)
610 """
Yury Selivanov622be342014-02-06 22:06:16 -0500611 arg_to_fut = {arg: async(arg, loop=loop) for arg in set(coros_or_futures)}
612 children = [arg_to_fut[arg] for arg in coros_or_futures]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700613 n = len(children)
614 if n == 0:
615 outer = futures.Future(loop=loop)
616 outer.set_result([])
617 return outer
618 if loop is None:
619 loop = children[0]._loop
620 for fut in children:
621 if fut._loop is not loop:
622 raise ValueError("futures are tied to different event loops")
623 outer = _GatheringFuture(children, loop=loop)
624 nfinished = 0
625 results = [None] * n
626
627 def _done_callback(i, fut):
628 nonlocal nfinished
629 if outer._state != futures._PENDING:
630 if fut._exception is not None:
631 # Mark exception retrieved.
632 fut.exception()
633 return
634 if fut._state == futures._CANCELLED:
635 res = futures.CancelledError()
636 if not return_exceptions:
637 outer.set_exception(res)
638 return
639 elif fut._exception is not None:
640 res = fut.exception() # Mark exception retrieved.
641 if not return_exceptions:
642 outer.set_exception(res)
643 return
644 else:
645 res = fut._result
646 results[i] = res
647 nfinished += 1
648 if nfinished == n:
649 outer.set_result(results)
650
651 for i, fut in enumerate(children):
652 fut.add_done_callback(functools.partial(_done_callback, i))
653 return outer
654
655
656def shield(arg, *, loop=None):
657 """Wait for a future, shielding it from cancellation.
658
659 The statement
660
661 res = yield from shield(something())
662
663 is exactly equivalent to the statement
664
665 res = yield from something()
666
667 *except* that if the coroutine containing it is cancelled, the
668 task running in something() is not cancelled. From the POV of
669 something(), the cancellation did not happen. But its caller is
670 still cancelled, so the yield-from expression still raises
671 CancelledError. Note: If something() is cancelled by other means
672 this will still cancel shield().
673
674 If you want to completely ignore cancellation (not recommended)
675 you can combine shield() with a try/except clause, as follows:
676
677 try:
678 res = yield from shield(something())
679 except CancelledError:
680 res = None
681 """
682 inner = async(arg, loop=loop)
683 if inner.done():
684 # Shortcut.
685 return inner
686 loop = inner._loop
687 outer = futures.Future(loop=loop)
688
689 def _done_callback(inner):
690 if outer.cancelled():
691 # Mark inner's result as retrieved.
692 inner.cancelled() or inner.exception()
693 return
694 if inner.cancelled():
695 outer.cancel()
696 else:
697 exc = inner.exception()
698 if exc is not None:
699 outer.set_exception(exc)
700 else:
701 outer.set_result(inner.result())
702
703 inner.add_done_callback(_done_callback)
704 return outer