blob: 153f731a76d156d5e42959bea8ed247c072558f4 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
3__all__ = ['coroutine', 'Task',
Guido van Rossum68816ef2013-12-28 08:06:40 -10004 'iscoroutinefunction', 'iscoroutine',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
6 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
Guido van Rossumde3a1362013-11-29 09:29:00 -08007 'gather', 'shield',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008 ]
9
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070010import concurrent.futures
11import functools
12import inspect
13import linecache
Victor Stinner7ef60cd2014-02-19 23:15:02 +010014import os
15import sys
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016import traceback
17import weakref
18
19from . import events
20from . import futures
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070021from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
23# If you set _DEBUG to true, @coroutine will wrap the resulting
24# generator objects in a CoroWrapper instance (defined below). That
25# instance will log a message when the generator is never iterated
26# over, which may happen when you forget to use "yield from" with a
27# coroutine call. Note that the value of the _DEBUG flag is taken
28# when the decorator is used, so to be of any use it must be set
29# before you define your coroutines. A downside of using this feature
30# is that tracebacks show entries for the CoroWrapper.__next__ method
31# when _DEBUG is true.
Victor Stinner7ef60cd2014-02-19 23:15:02 +010032_DEBUG = (not sys.flags.ignore_environment
33 and bool(os.environ.get('PYTHONASYNCIODEBUG')))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070034
35
36class CoroWrapper:
Guido van Rossume1f55442014-01-16 11:05:23 -080037 # Wrapper for coroutine in _DEBUG mode.
38
Victor Stinnerbac77932014-01-16 01:55:29 +010039 __slots__ = ['gen', 'func', '__name__', '__doc__']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070040
41 def __init__(self, gen, func):
42 assert inspect.isgenerator(gen), gen
43 self.gen = gen
44 self.func = func
45
46 def __iter__(self):
47 return self
48
49 def __next__(self):
50 return next(self.gen)
51
52 def send(self, value):
53 return self.gen.send(value)
54
55 def throw(self, exc):
56 return self.gen.throw(exc)
57
58 def close(self):
59 return self.gen.close()
60
61 def __del__(self):
62 frame = self.gen.gi_frame
63 if frame is not None and frame.f_lasti == -1:
64 func = self.func
65 code = func.__code__
66 filename = code.co_filename
67 lineno = code.co_firstlineno
Guido van Rossum2b430b82013-11-01 14:13:30 -070068 logger.error(
69 'Coroutine %r defined at %s:%s was never yielded from',
70 func.__name__, filename, lineno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070071
72
73def coroutine(func):
74 """Decorator to mark coroutines.
75
76 If the coroutine is not yielded from before it is destroyed,
77 an error message is logged.
78 """
79 if inspect.isgeneratorfunction(func):
80 coro = func
81 else:
82 @functools.wraps(func)
83 def coro(*args, **kw):
84 res = func(*args, **kw)
85 if isinstance(res, futures.Future) or inspect.isgenerator(res):
86 res = yield from res
87 return res
88
89 if not _DEBUG:
90 wrapper = coro
91 else:
92 @functools.wraps(func)
93 def wrapper(*args, **kwds):
94 w = CoroWrapper(coro(*args, **kwds), func)
95 w.__name__ = coro.__name__
96 w.__doc__ = coro.__doc__
97 return w
98
99 wrapper._is_coroutine = True # For iscoroutinefunction().
100 return wrapper
101
102
103def iscoroutinefunction(func):
104 """Return True if func is a decorated coroutine function."""
105 return getattr(func, '_is_coroutine', False)
106
107
108def iscoroutine(obj):
109 """Return True if obj is a coroutine object."""
110 return isinstance(obj, CoroWrapper) or inspect.isgenerator(obj)
111
112
113class Task(futures.Future):
114 """A coroutine wrapped in a Future."""
115
116 # An important invariant maintained while a Task not done:
117 #
118 # - Either _fut_waiter is None, and _step() is scheduled;
119 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
120 #
121 # The only transition from the latter to the former is through
122 # _wakeup(). When _fut_waiter is not None, one of its callbacks
123 # must be _wakeup().
124
125 # Weak set containing all tasks alive.
126 _all_tasks = weakref.WeakSet()
127
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800128 # Dictionary containing tasks that are currently active in
129 # all running event loops. {EventLoop: Task}
130 _current_tasks = {}
131
132 @classmethod
133 def current_task(cls, loop=None):
134 """Return the currently running task in an event loop or None.
135
136 By default the current task for the current event loop is returned.
137
138 None is returned when called not in the context of a Task.
139 """
140 if loop is None:
141 loop = events.get_event_loop()
142 return cls._current_tasks.get(loop)
143
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700144 @classmethod
145 def all_tasks(cls, loop=None):
146 """Return a set of all tasks for an event loop.
147
148 By default all tasks for the current event loop are returned.
149 """
150 if loop is None:
151 loop = events.get_event_loop()
152 return {t for t in cls._all_tasks if t._loop is loop}
153
154 def __init__(self, coro, *, loop=None):
155 assert iscoroutine(coro), repr(coro) # Not a coroutine function!
156 super().__init__(loop=loop)
157 self._coro = iter(coro) # Use the iterator just in case.
158 self._fut_waiter = None
159 self._must_cancel = False
160 self._loop.call_soon(self._step)
161 self.__class__._all_tasks.add(self)
162
163 def __repr__(self):
164 res = super().__repr__()
165 if (self._must_cancel and
166 self._state == futures._PENDING and
167 '<PENDING' in res):
168 res = res.replace('<PENDING', '<CANCELLING', 1)
169 i = res.find('<')
170 if i < 0:
171 i = len(res)
172 res = res[:i] + '(<{}>)'.format(self._coro.__name__) + res[i:]
173 return res
174
175 def get_stack(self, *, limit=None):
176 """Return the list of stack frames for this task's coroutine.
177
178 If the coroutine is active, this returns the stack where it is
179 suspended. If the coroutine has completed successfully or was
180 cancelled, this returns an empty list. If the coroutine was
181 terminated by an exception, this returns the list of traceback
182 frames.
183
184 The frames are always ordered from oldest to newest.
185
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500186 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700187 return; by default all available frames are returned. Its
188 meaning differs depending on whether a stack or a traceback is
189 returned: the newest frames of a stack are returned, but the
190 oldest frames of a traceback are returned. (This matches the
191 behavior of the traceback module.)
192
193 For reasons beyond our control, only one stack frame is
194 returned for a suspended coroutine.
195 """
196 frames = []
197 f = self._coro.gi_frame
198 if f is not None:
199 while f is not None:
200 if limit is not None:
201 if limit <= 0:
202 break
203 limit -= 1
204 frames.append(f)
205 f = f.f_back
206 frames.reverse()
207 elif self._exception is not None:
208 tb = self._exception.__traceback__
209 while tb is not None:
210 if limit is not None:
211 if limit <= 0:
212 break
213 limit -= 1
214 frames.append(tb.tb_frame)
215 tb = tb.tb_next
216 return frames
217
218 def print_stack(self, *, limit=None, file=None):
219 """Print the stack or traceback for this task's coroutine.
220
221 This produces output similar to that of the traceback module,
222 for the frames retrieved by get_stack(). The limit argument
223 is passed to get_stack(). The file argument is an I/O stream
224 to which the output goes; by default it goes to sys.stderr.
225 """
226 extracted_list = []
227 checked = set()
228 for f in self.get_stack(limit=limit):
229 lineno = f.f_lineno
230 co = f.f_code
231 filename = co.co_filename
232 name = co.co_name
233 if filename not in checked:
234 checked.add(filename)
235 linecache.checkcache(filename)
236 line = linecache.getline(filename, lineno, f.f_globals)
237 extracted_list.append((filename, lineno, name, line))
238 exc = self._exception
239 if not extracted_list:
240 print('No stack for %r' % self, file=file)
241 elif exc is not None:
242 print('Traceback for %r (most recent call last):' % self,
243 file=file)
244 else:
245 print('Stack for %r (most recent call last):' % self,
246 file=file)
247 traceback.print_list(extracted_list, file=file)
248 if exc is not None:
249 for line in traceback.format_exception_only(exc.__class__, exc):
250 print(line, file=file, end='')
251
252 def cancel(self):
Victor Stinner4bd652a2014-04-07 11:18:06 +0200253 """Request that a task to cancel itself.
254
255 This arranges for a CancellationError to be thrown into the
256 wrapped coroutine on the next cycle through the event loop.
257 The coroutine then has a chance to clean up or even deny
258 the request using try/except/finally.
259
260 Contrary to Future.cancel(), this does not guarantee that the
261 task will be cancelled: the exception might be caught and
262 acted upon, delaying cancellation of the task or preventing it
263 completely. The task may also return a value or raise a
264 different exception.
265
266 Immediately after this method is called, Task.cancelled() will
267 not return True (unless the task was already cancelled). A
268 task will be marked as cancelled when the wrapped coroutine
269 terminates with a CancelledError exception (even if cancel()
270 was not called).
271 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700272 if self.done():
273 return False
274 if self._fut_waiter is not None:
275 if self._fut_waiter.cancel():
276 # Leave self._fut_waiter; it may be a Task that
277 # catches and ignores the cancellation so we may have
278 # to cancel it again later.
279 return True
280 # It must be the case that self._step is already scheduled.
281 self._must_cancel = True
282 return True
283
284 def _step(self, value=None, exc=None):
285 assert not self.done(), \
286 '_step(): already done: {!r}, {!r}, {!r}'.format(self, value, exc)
287 if self._must_cancel:
288 if not isinstance(exc, futures.CancelledError):
289 exc = futures.CancelledError()
290 self._must_cancel = False
291 coro = self._coro
292 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800293
294 self.__class__._current_tasks[self._loop] = self
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700295 # Call either coro.throw(exc) or coro.send(value).
296 try:
297 if exc is not None:
298 result = coro.throw(exc)
299 elif value is not None:
300 result = coro.send(value)
301 else:
302 result = next(coro)
303 except StopIteration as exc:
304 self.set_result(exc.value)
305 except futures.CancelledError as exc:
306 super().cancel() # I.e., Future.cancel(self).
307 except Exception as exc:
308 self.set_exception(exc)
309 except BaseException as exc:
310 self.set_exception(exc)
311 raise
312 else:
313 if isinstance(result, futures.Future):
314 # Yielded Future must come from Future.__iter__().
315 if result._blocking:
316 result._blocking = False
317 result.add_done_callback(self._wakeup)
318 self._fut_waiter = result
319 if self._must_cancel:
320 if self._fut_waiter.cancel():
321 self._must_cancel = False
322 else:
323 self._loop.call_soon(
324 self._step, None,
325 RuntimeError(
326 'yield was used instead of yield from '
327 'in task {!r} with {!r}'.format(self, result)))
328 elif result is None:
329 # Bare yield relinquishes control for one event loop iteration.
330 self._loop.call_soon(self._step)
331 elif inspect.isgenerator(result):
332 # Yielding a generator is just wrong.
333 self._loop.call_soon(
334 self._step, None,
335 RuntimeError(
336 'yield was used instead of yield from for '
337 'generator in task {!r} with {}'.format(
338 self, result)))
339 else:
340 # Yielding something else is an error.
341 self._loop.call_soon(
342 self._step, None,
343 RuntimeError(
344 'Task got bad yield: {!r}'.format(result)))
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800345 finally:
346 self.__class__._current_tasks.pop(self._loop)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100347 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700348
349 def _wakeup(self, future):
350 try:
351 value = future.result()
352 except Exception as exc:
353 # This may also be a cancellation.
354 self._step(None, exc)
355 else:
356 self._step(value, None)
357 self = None # Needed to break cycles when an exception occurs.
358
359
360# wait() and as_completed() similar to those in PEP 3148.
361
362FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
363FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
364ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
365
366
367@coroutine
368def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
369 """Wait for the Futures and coroutines given by fs to complete.
370
371 Coroutines will be wrapped in Tasks.
372
373 Returns two sets of Future: (done, pending).
374
375 Usage:
376
377 done, pending = yield from asyncio.wait(fs)
378
379 Note: This does not raise TimeoutError! Futures that aren't done
380 when the timeout occurs are returned in the second set.
381 """
Victor Stinnereb748762014-02-11 11:54:08 +0100382 if isinstance(fs, futures.Future) or iscoroutine(fs):
383 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700384 if not fs:
385 raise ValueError('Set of coroutines/Futures is empty.')
386
387 if loop is None:
388 loop = events.get_event_loop()
389
Yury Selivanov622be342014-02-06 22:06:16 -0500390 fs = {async(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700391
392 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
393 raise ValueError('Invalid return_when value: {}'.format(return_when))
394 return (yield from _wait(fs, timeout, return_when, loop))
395
396
397def _release_waiter(waiter, value=True, *args):
398 if not waiter.done():
399 waiter.set_result(value)
400
401
402@coroutine
403def wait_for(fut, timeout, *, loop=None):
404 """Wait for the single Future or coroutine to complete, with timeout.
405
406 Coroutine will be wrapped in Task.
407
Victor Stinner421e49b2014-01-23 17:40:59 +0100408 Returns result of the Future or coroutine. When a timeout occurs,
409 it cancels the task and raises TimeoutError. To avoid the task
410 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700411
412 Usage:
413
414 result = yield from asyncio.wait_for(fut, 10.0)
415
416 """
417 if loop is None:
418 loop = events.get_event_loop()
419
Guido van Rossum48c66c32014-01-29 14:30:38 -0800420 if timeout is None:
421 return (yield from fut)
422
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700423 waiter = futures.Future(loop=loop)
424 timeout_handle = loop.call_later(timeout, _release_waiter, waiter, False)
425 cb = functools.partial(_release_waiter, waiter, True)
426
427 fut = async(fut, loop=loop)
428 fut.add_done_callback(cb)
429
430 try:
431 if (yield from waiter):
432 return fut.result()
433 else:
434 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100435 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700436 raise futures.TimeoutError()
437 finally:
438 timeout_handle.cancel()
439
440
441@coroutine
442def _wait(fs, timeout, return_when, loop):
443 """Internal helper for wait() and _wait_for().
444
445 The fs argument must be a collection of Futures.
446 """
447 assert fs, 'Set of Futures is empty.'
448 waiter = futures.Future(loop=loop)
449 timeout_handle = None
450 if timeout is not None:
451 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
452 counter = len(fs)
453
454 def _on_completion(f):
455 nonlocal counter
456 counter -= 1
457 if (counter <= 0 or
458 return_when == FIRST_COMPLETED or
459 return_when == FIRST_EXCEPTION and (not f.cancelled() and
460 f.exception() is not None)):
461 if timeout_handle is not None:
462 timeout_handle.cancel()
463 if not waiter.done():
464 waiter.set_result(False)
465
466 for f in fs:
467 f.add_done_callback(_on_completion)
468
469 try:
470 yield from waiter
471 finally:
472 if timeout_handle is not None:
473 timeout_handle.cancel()
474
475 done, pending = set(), set()
476 for f in fs:
477 f.remove_done_callback(_on_completion)
478 if f.done():
479 done.add(f)
480 else:
481 pending.add(f)
482 return done, pending
483
484
485# This is *not* a @coroutine! It is just an iterator (yielding Futures).
486def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800487 """Return an iterator whose values are coroutines.
488
489 When waiting for the yielded coroutines you'll get the results (or
490 exceptions!) of the original Futures (or coroutines), in the order
491 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700492
493 This differs from PEP 3148; the proper way to use this is:
494
495 for f in as_completed(fs):
496 result = yield from f # The 'yield from' may raise.
497 # Use result.
498
Guido van Rossumb58f0532014-02-12 17:58:19 -0800499 If a timeout is specified, the 'yield from' will raise
500 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700501
502 Note: The futures 'f' are not necessarily members of fs.
503 """
Victor Stinnereb748762014-02-11 11:54:08 +0100504 if isinstance(fs, futures.Future) or iscoroutine(fs):
505 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700506 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov622be342014-02-06 22:06:16 -0500507 todo = {async(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800508 from .queues import Queue # Import here to avoid circular import problem.
509 done = Queue(loop=loop)
510 timeout_handle = None
511
512 def _on_timeout():
513 for f in todo:
514 f.remove_done_callback(_on_completion)
515 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
516 todo.clear() # Can't do todo.remove(f) in the loop.
517
518 def _on_completion(f):
519 if not todo:
520 return # _on_timeout() was here first.
521 todo.remove(f)
522 done.put_nowait(f)
523 if not todo and timeout_handle is not None:
524 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700525
526 @coroutine
527 def _wait_for_one():
Guido van Rossumb58f0532014-02-12 17:58:19 -0800528 f = yield from done.get()
529 if f is None:
530 # Dummy value from _on_timeout().
531 raise futures.TimeoutError
532 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700533
Guido van Rossumb58f0532014-02-12 17:58:19 -0800534 for f in todo:
535 f.add_done_callback(_on_completion)
536 if todo and timeout is not None:
537 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700538 for _ in range(len(todo)):
539 yield _wait_for_one()
540
541
542@coroutine
543def sleep(delay, result=None, *, loop=None):
544 """Coroutine that completes after a given time (in seconds)."""
545 future = futures.Future(loop=loop)
546 h = future._loop.call_later(delay, future.set_result, result)
547 try:
548 return (yield from future)
549 finally:
550 h.cancel()
551
552
553def async(coro_or_future, *, loop=None):
554 """Wrap a coroutine in a future.
555
556 If the argument is a Future, it is returned directly.
557 """
558 if isinstance(coro_or_future, futures.Future):
559 if loop is not None and loop is not coro_or_future._loop:
560 raise ValueError('loop argument must agree with Future')
561 return coro_or_future
562 elif iscoroutine(coro_or_future):
563 return Task(coro_or_future, loop=loop)
564 else:
565 raise TypeError('A Future or coroutine is required')
566
567
568class _GatheringFuture(futures.Future):
569 """Helper for gather().
570
571 This overrides cancel() to cancel all the children and act more
572 like Task.cancel(), which doesn't immediately mark itself as
573 cancelled.
574 """
575
576 def __init__(self, children, *, loop=None):
577 super().__init__(loop=loop)
578 self._children = children
579
580 def cancel(self):
581 if self.done():
582 return False
583 for child in self._children:
584 child.cancel()
585 return True
586
587
588def gather(*coros_or_futures, loop=None, return_exceptions=False):
589 """Return a future aggregating results from the given coroutines
590 or futures.
591
592 All futures must share the same event loop. If all the tasks are
593 done successfully, the returned future's result is the list of
594 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500595 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700596 exceptions in the tasks are treated the same as successful
597 results, and gathered in the result list; otherwise, the first
598 raised exception will be immediately propagated to the returned
599 future.
600
601 Cancellation: if the outer Future is cancelled, all children (that
602 have not completed yet) are also cancelled. If any child is
603 cancelled, this is treated as if it raised CancelledError --
604 the outer Future is *not* cancelled in this case. (This is to
605 prevent the cancellation of one child to cause other children to
606 be cancelled.)
607 """
Yury Selivanov622be342014-02-06 22:06:16 -0500608 arg_to_fut = {arg: async(arg, loop=loop) for arg in set(coros_or_futures)}
609 children = [arg_to_fut[arg] for arg in coros_or_futures]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700610 n = len(children)
611 if n == 0:
612 outer = futures.Future(loop=loop)
613 outer.set_result([])
614 return outer
615 if loop is None:
616 loop = children[0]._loop
617 for fut in children:
618 if fut._loop is not loop:
619 raise ValueError("futures are tied to different event loops")
620 outer = _GatheringFuture(children, loop=loop)
621 nfinished = 0
622 results = [None] * n
623
624 def _done_callback(i, fut):
625 nonlocal nfinished
626 if outer._state != futures._PENDING:
627 if fut._exception is not None:
628 # Mark exception retrieved.
629 fut.exception()
630 return
631 if fut._state == futures._CANCELLED:
632 res = futures.CancelledError()
633 if not return_exceptions:
634 outer.set_exception(res)
635 return
636 elif fut._exception is not None:
637 res = fut.exception() # Mark exception retrieved.
638 if not return_exceptions:
639 outer.set_exception(res)
640 return
641 else:
642 res = fut._result
643 results[i] = res
644 nfinished += 1
645 if nfinished == n:
646 outer.set_result(results)
647
648 for i, fut in enumerate(children):
649 fut.add_done_callback(functools.partial(_done_callback, i))
650 return outer
651
652
653def shield(arg, *, loop=None):
654 """Wait for a future, shielding it from cancellation.
655
656 The statement
657
658 res = yield from shield(something())
659
660 is exactly equivalent to the statement
661
662 res = yield from something()
663
664 *except* that if the coroutine containing it is cancelled, the
665 task running in something() is not cancelled. From the POV of
666 something(), the cancellation did not happen. But its caller is
667 still cancelled, so the yield-from expression still raises
668 CancelledError. Note: If something() is cancelled by other means
669 this will still cancel shield().
670
671 If you want to completely ignore cancellation (not recommended)
672 you can combine shield() with a try/except clause, as follows:
673
674 try:
675 res = yield from shield(something())
676 except CancelledError:
677 res = None
678 """
679 inner = async(arg, loop=loop)
680 if inner.done():
681 # Shortcut.
682 return inner
683 loop = inner._loop
684 outer = futures.Future(loop=loop)
685
686 def _done_callback(inner):
687 if outer.cancelled():
688 # Mark inner's result as retrieved.
689 inner.cancelled() or inner.exception()
690 return
691 if inner.cancelled():
692 outer.cancel()
693 else:
694 exc = inner.exception()
695 if exc is not None:
696 outer.set_exception(exc)
697 else:
698 outer.set_result(inner.result())
699
700 inner.add_done_callback(_done_callback)
701 return outer