blob: e9adf1dfb12aa9315166f962849dff14e599e34a [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Victor Stinnerf951d282014-06-29 00:46:45 +02003__all__ = ['Task',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
5 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
Guido van Rossumde3a1362013-11-29 09:29:00 -08006 'gather', 'shield',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007 ]
8
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009import concurrent.futures
10import functools
11import inspect
12import linecache
Victor Stinner7ef60cd2014-02-19 23:15:02 +010013import sys
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014import traceback
15import weakref
16
Victor Stinnerf951d282014-06-29 00:46:45 +020017from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import events
19from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020020from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070021from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
Victor Stinnera02f81f2014-06-24 22:37:53 +020023_PY34 = (sys.version_info >= (3, 4))
Victor Stinner8d3e02e2014-06-18 01:14:59 +020024_PY35 = (sys.version_info >= (3, 5))
25
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027class Task(futures.Future):
28 """A coroutine wrapped in a Future."""
29
30 # An important invariant maintained while a Task not done:
31 #
32 # - Either _fut_waiter is None, and _step() is scheduled;
33 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
34 #
35 # The only transition from the latter to the former is through
36 # _wakeup(). When _fut_waiter is not None, one of its callbacks
37 # must be _wakeup().
38
39 # Weak set containing all tasks alive.
40 _all_tasks = weakref.WeakSet()
41
Guido van Rossum1a605ed2013-12-06 12:57:40 -080042 # Dictionary containing tasks that are currently active in
43 # all running event loops. {EventLoop: Task}
44 _current_tasks = {}
45
46 @classmethod
47 def current_task(cls, loop=None):
48 """Return the currently running task in an event loop or None.
49
50 By default the current task for the current event loop is returned.
51
52 None is returned when called not in the context of a Task.
53 """
54 if loop is None:
55 loop = events.get_event_loop()
56 return cls._current_tasks.get(loop)
57
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070058 @classmethod
59 def all_tasks(cls, loop=None):
60 """Return a set of all tasks for an event loop.
61
62 By default all tasks for the current event loop are returned.
63 """
64 if loop is None:
65 loop = events.get_event_loop()
66 return {t for t in cls._all_tasks if t._loop is loop}
67
68 def __init__(self, coro, *, loop=None):
Victor Stinnerf951d282014-06-29 00:46:45 +020069 assert coroutines.iscoroutine(coro), repr(coro) # Not a coroutine function!
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070070 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +020071 if self._source_traceback:
72 del self._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070073 self._coro = iter(coro) # Use the iterator just in case.
74 self._fut_waiter = None
75 self._must_cancel = False
76 self._loop.call_soon(self._step)
77 self.__class__._all_tasks.add(self)
Victor Stinner98b63912014-06-30 14:51:04 +020078 # If False, don't log a message if the task is destroyed whereas its
79 # status is still pending
80 self._log_destroy_pending = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070081
Victor Stinnera02f81f2014-06-24 22:37:53 +020082 # On Python 3.3 or older, objects with a destructor part of a reference
83 # cycle are never destroyed. It's not more the case on Python 3.4 thanks to
84 # the PEP 442.
85 if _PY34:
86 def __del__(self):
Victor Stinner98b63912014-06-30 14:51:04 +020087 if self._state == futures._PENDING and self._log_destroy_pending:
Victor Stinner80f53aa2014-06-27 13:52:20 +020088 context = {
Victor Stinnera02f81f2014-06-24 22:37:53 +020089 'task': self,
90 'message': 'Task was destroyed but it is pending!',
Victor Stinner80f53aa2014-06-27 13:52:20 +020091 }
92 if self._source_traceback:
93 context['source_traceback'] = self._source_traceback
94 self._loop.call_exception_handler(context)
Victor Stinnera02f81f2014-06-24 22:37:53 +020095 futures.Future.__del__(self)
96
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070097 def __repr__(self):
Victor Stinner975735f2014-06-25 21:41:58 +020098 info = []
99 if self._must_cancel:
100 info.append('cancelling')
101 else:
102 info.append(self._state.lower())
103
Victor Stinnerf951d282014-06-29 00:46:45 +0200104 info.append(coroutines._format_coroutine(self._coro))
Victor Stinner975735f2014-06-25 21:41:58 +0200105
106 if self._state == futures._FINISHED:
107 info.append(self._format_result())
108
109 if self._callbacks:
110 info.append(self._format_callbacks())
111
112 return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700113
114 def get_stack(self, *, limit=None):
115 """Return the list of stack frames for this task's coroutine.
116
117 If the coroutine is active, this returns the stack where it is
118 suspended. If the coroutine has completed successfully or was
119 cancelled, this returns an empty list. If the coroutine was
120 terminated by an exception, this returns the list of traceback
121 frames.
122
123 The frames are always ordered from oldest to newest.
124
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500125 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700126 return; by default all available frames are returned. Its
127 meaning differs depending on whether a stack or a traceback is
128 returned: the newest frames of a stack are returned, but the
129 oldest frames of a traceback are returned. (This matches the
130 behavior of the traceback module.)
131
132 For reasons beyond our control, only one stack frame is
133 returned for a suspended coroutine.
134 """
135 frames = []
136 f = self._coro.gi_frame
137 if f is not None:
138 while f is not None:
139 if limit is not None:
140 if limit <= 0:
141 break
142 limit -= 1
143 frames.append(f)
144 f = f.f_back
145 frames.reverse()
146 elif self._exception is not None:
147 tb = self._exception.__traceback__
148 while tb is not None:
149 if limit is not None:
150 if limit <= 0:
151 break
152 limit -= 1
153 frames.append(tb.tb_frame)
154 tb = tb.tb_next
155 return frames
156
157 def print_stack(self, *, limit=None, file=None):
158 """Print the stack or traceback for this task's coroutine.
159
160 This produces output similar to that of the traceback module,
161 for the frames retrieved by get_stack(). The limit argument
162 is passed to get_stack(). The file argument is an I/O stream
163 to which the output goes; by default it goes to sys.stderr.
164 """
165 extracted_list = []
166 checked = set()
167 for f in self.get_stack(limit=limit):
168 lineno = f.f_lineno
169 co = f.f_code
170 filename = co.co_filename
171 name = co.co_name
172 if filename not in checked:
173 checked.add(filename)
174 linecache.checkcache(filename)
175 line = linecache.getline(filename, lineno, f.f_globals)
176 extracted_list.append((filename, lineno, name, line))
177 exc = self._exception
178 if not extracted_list:
179 print('No stack for %r' % self, file=file)
180 elif exc is not None:
181 print('Traceback for %r (most recent call last):' % self,
182 file=file)
183 else:
184 print('Stack for %r (most recent call last):' % self,
185 file=file)
186 traceback.print_list(extracted_list, file=file)
187 if exc is not None:
188 for line in traceback.format_exception_only(exc.__class__, exc):
189 print(line, file=file, end='')
190
191 def cancel(self):
Victor Stinner8d213572014-06-02 23:06:46 +0200192 """Request this task to cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200193
Victor Stinner8d213572014-06-02 23:06:46 +0200194 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200195 wrapped coroutine on the next cycle through the event loop.
196 The coroutine then has a chance to clean up or even deny
197 the request using try/except/finally.
198
199 Contrary to Future.cancel(), this does not guarantee that the
200 task will be cancelled: the exception might be caught and
201 acted upon, delaying cancellation of the task or preventing it
202 completely. The task may also return a value or raise a
203 different exception.
204
205 Immediately after this method is called, Task.cancelled() will
206 not return True (unless the task was already cancelled). A
207 task will be marked as cancelled when the wrapped coroutine
208 terminates with a CancelledError exception (even if cancel()
209 was not called).
210 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700211 if self.done():
212 return False
213 if self._fut_waiter is not None:
214 if self._fut_waiter.cancel():
215 # Leave self._fut_waiter; it may be a Task that
216 # catches and ignores the cancellation so we may have
217 # to cancel it again later.
218 return True
219 # It must be the case that self._step is already scheduled.
220 self._must_cancel = True
221 return True
222
223 def _step(self, value=None, exc=None):
224 assert not self.done(), \
225 '_step(): already done: {!r}, {!r}, {!r}'.format(self, value, exc)
226 if self._must_cancel:
227 if not isinstance(exc, futures.CancelledError):
228 exc = futures.CancelledError()
229 self._must_cancel = False
230 coro = self._coro
231 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800232
233 self.__class__._current_tasks[self._loop] = self
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700234 # Call either coro.throw(exc) or coro.send(value).
235 try:
236 if exc is not None:
237 result = coro.throw(exc)
238 elif value is not None:
239 result = coro.send(value)
240 else:
241 result = next(coro)
242 except StopIteration as exc:
243 self.set_result(exc.value)
244 except futures.CancelledError as exc:
245 super().cancel() # I.e., Future.cancel(self).
246 except Exception as exc:
247 self.set_exception(exc)
248 except BaseException as exc:
249 self.set_exception(exc)
250 raise
251 else:
252 if isinstance(result, futures.Future):
253 # Yielded Future must come from Future.__iter__().
254 if result._blocking:
255 result._blocking = False
256 result.add_done_callback(self._wakeup)
257 self._fut_waiter = result
258 if self._must_cancel:
259 if self._fut_waiter.cancel():
260 self._must_cancel = False
261 else:
262 self._loop.call_soon(
263 self._step, None,
264 RuntimeError(
265 'yield was used instead of yield from '
266 'in task {!r} with {!r}'.format(self, result)))
267 elif result is None:
268 # Bare yield relinquishes control for one event loop iteration.
269 self._loop.call_soon(self._step)
270 elif inspect.isgenerator(result):
271 # Yielding a generator is just wrong.
272 self._loop.call_soon(
273 self._step, None,
274 RuntimeError(
275 'yield was used instead of yield from for '
276 'generator in task {!r} with {}'.format(
277 self, result)))
278 else:
279 # Yielding something else is an error.
280 self._loop.call_soon(
281 self._step, None,
282 RuntimeError(
283 'Task got bad yield: {!r}'.format(result)))
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800284 finally:
285 self.__class__._current_tasks.pop(self._loop)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100286 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700287
288 def _wakeup(self, future):
289 try:
290 value = future.result()
291 except Exception as exc:
292 # This may also be a cancellation.
293 self._step(None, exc)
294 else:
295 self._step(value, None)
296 self = None # Needed to break cycles when an exception occurs.
297
298
299# wait() and as_completed() similar to those in PEP 3148.
300
301FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
302FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
303ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
304
305
306@coroutine
307def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
308 """Wait for the Futures and coroutines given by fs to complete.
309
Victor Stinnerdb74d982014-06-10 11:16:05 +0200310 The sequence futures must not be empty.
311
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700312 Coroutines will be wrapped in Tasks.
313
314 Returns two sets of Future: (done, pending).
315
316 Usage:
317
318 done, pending = yield from asyncio.wait(fs)
319
320 Note: This does not raise TimeoutError! Futures that aren't done
321 when the timeout occurs are returned in the second set.
322 """
Victor Stinnerf951d282014-06-29 00:46:45 +0200323 if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100324 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700325 if not fs:
326 raise ValueError('Set of coroutines/Futures is empty.')
327
328 if loop is None:
329 loop = events.get_event_loop()
330
Yury Selivanov622be342014-02-06 22:06:16 -0500331 fs = {async(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700332
333 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
334 raise ValueError('Invalid return_when value: {}'.format(return_when))
335 return (yield from _wait(fs, timeout, return_when, loop))
336
337
338def _release_waiter(waiter, value=True, *args):
339 if not waiter.done():
340 waiter.set_result(value)
341
342
343@coroutine
344def wait_for(fut, timeout, *, loop=None):
345 """Wait for the single Future or coroutine to complete, with timeout.
346
347 Coroutine will be wrapped in Task.
348
Victor Stinner421e49b2014-01-23 17:40:59 +0100349 Returns result of the Future or coroutine. When a timeout occurs,
350 it cancels the task and raises TimeoutError. To avoid the task
351 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700352
353 Usage:
354
355 result = yield from asyncio.wait_for(fut, 10.0)
356
357 """
358 if loop is None:
359 loop = events.get_event_loop()
360
Guido van Rossum48c66c32014-01-29 14:30:38 -0800361 if timeout is None:
362 return (yield from fut)
363
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700364 waiter = futures.Future(loop=loop)
365 timeout_handle = loop.call_later(timeout, _release_waiter, waiter, False)
366 cb = functools.partial(_release_waiter, waiter, True)
367
368 fut = async(fut, loop=loop)
369 fut.add_done_callback(cb)
370
371 try:
372 if (yield from waiter):
373 return fut.result()
374 else:
375 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100376 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700377 raise futures.TimeoutError()
378 finally:
379 timeout_handle.cancel()
380
381
382@coroutine
383def _wait(fs, timeout, return_when, loop):
384 """Internal helper for wait() and _wait_for().
385
386 The fs argument must be a collection of Futures.
387 """
388 assert fs, 'Set of Futures is empty.'
389 waiter = futures.Future(loop=loop)
390 timeout_handle = None
391 if timeout is not None:
392 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
393 counter = len(fs)
394
395 def _on_completion(f):
396 nonlocal counter
397 counter -= 1
398 if (counter <= 0 or
399 return_when == FIRST_COMPLETED or
400 return_when == FIRST_EXCEPTION and (not f.cancelled() and
401 f.exception() is not None)):
402 if timeout_handle is not None:
403 timeout_handle.cancel()
404 if not waiter.done():
405 waiter.set_result(False)
406
407 for f in fs:
408 f.add_done_callback(_on_completion)
409
410 try:
411 yield from waiter
412 finally:
413 if timeout_handle is not None:
414 timeout_handle.cancel()
415
416 done, pending = set(), set()
417 for f in fs:
418 f.remove_done_callback(_on_completion)
419 if f.done():
420 done.add(f)
421 else:
422 pending.add(f)
423 return done, pending
424
425
426# This is *not* a @coroutine! It is just an iterator (yielding Futures).
427def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800428 """Return an iterator whose values are coroutines.
429
430 When waiting for the yielded coroutines you'll get the results (or
431 exceptions!) of the original Futures (or coroutines), in the order
432 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700433
434 This differs from PEP 3148; the proper way to use this is:
435
436 for f in as_completed(fs):
437 result = yield from f # The 'yield from' may raise.
438 # Use result.
439
Guido van Rossumb58f0532014-02-12 17:58:19 -0800440 If a timeout is specified, the 'yield from' will raise
441 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700442
443 Note: The futures 'f' are not necessarily members of fs.
444 """
Victor Stinnerf951d282014-06-29 00:46:45 +0200445 if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100446 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700447 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov622be342014-02-06 22:06:16 -0500448 todo = {async(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800449 from .queues import Queue # Import here to avoid circular import problem.
450 done = Queue(loop=loop)
451 timeout_handle = None
452
453 def _on_timeout():
454 for f in todo:
455 f.remove_done_callback(_on_completion)
456 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
457 todo.clear() # Can't do todo.remove(f) in the loop.
458
459 def _on_completion(f):
460 if not todo:
461 return # _on_timeout() was here first.
462 todo.remove(f)
463 done.put_nowait(f)
464 if not todo and timeout_handle is not None:
465 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700466
467 @coroutine
468 def _wait_for_one():
Guido van Rossumb58f0532014-02-12 17:58:19 -0800469 f = yield from done.get()
470 if f is None:
471 # Dummy value from _on_timeout().
472 raise futures.TimeoutError
473 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700474
Guido van Rossumb58f0532014-02-12 17:58:19 -0800475 for f in todo:
476 f.add_done_callback(_on_completion)
477 if todo and timeout is not None:
478 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700479 for _ in range(len(todo)):
480 yield _wait_for_one()
481
482
483@coroutine
484def sleep(delay, result=None, *, loop=None):
485 """Coroutine that completes after a given time (in seconds)."""
486 future = futures.Future(loop=loop)
487 h = future._loop.call_later(delay, future.set_result, result)
488 try:
489 return (yield from future)
490 finally:
491 h.cancel()
492
493
494def async(coro_or_future, *, loop=None):
495 """Wrap a coroutine in a future.
496
497 If the argument is a Future, it is returned directly.
498 """
499 if isinstance(coro_or_future, futures.Future):
500 if loop is not None and loop is not coro_or_future._loop:
501 raise ValueError('loop argument must agree with Future')
502 return coro_or_future
Victor Stinnerf951d282014-06-29 00:46:45 +0200503 elif coroutines.iscoroutine(coro_or_future):
Victor Stinner80f53aa2014-06-27 13:52:20 +0200504 task = Task(coro_or_future, loop=loop)
505 if task._source_traceback:
506 del task._source_traceback[-1]
507 return task
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700508 else:
509 raise TypeError('A Future or coroutine is required')
510
511
512class _GatheringFuture(futures.Future):
513 """Helper for gather().
514
515 This overrides cancel() to cancel all the children and act more
516 like Task.cancel(), which doesn't immediately mark itself as
517 cancelled.
518 """
519
520 def __init__(self, children, *, loop=None):
521 super().__init__(loop=loop)
522 self._children = children
523
524 def cancel(self):
525 if self.done():
526 return False
527 for child in self._children:
528 child.cancel()
529 return True
530
531
532def gather(*coros_or_futures, loop=None, return_exceptions=False):
533 """Return a future aggregating results from the given coroutines
534 or futures.
535
536 All futures must share the same event loop. If all the tasks are
537 done successfully, the returned future's result is the list of
538 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500539 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700540 exceptions in the tasks are treated the same as successful
541 results, and gathered in the result list; otherwise, the first
542 raised exception will be immediately propagated to the returned
543 future.
544
545 Cancellation: if the outer Future is cancelled, all children (that
546 have not completed yet) are also cancelled. If any child is
547 cancelled, this is treated as if it raised CancelledError --
548 the outer Future is *not* cancelled in this case. (This is to
549 prevent the cancellation of one child to cause other children to
550 be cancelled.)
551 """
Yury Selivanov622be342014-02-06 22:06:16 -0500552 arg_to_fut = {arg: async(arg, loop=loop) for arg in set(coros_or_futures)}
553 children = [arg_to_fut[arg] for arg in coros_or_futures]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700554 n = len(children)
555 if n == 0:
556 outer = futures.Future(loop=loop)
557 outer.set_result([])
558 return outer
559 if loop is None:
560 loop = children[0]._loop
561 for fut in children:
562 if fut._loop is not loop:
563 raise ValueError("futures are tied to different event loops")
564 outer = _GatheringFuture(children, loop=loop)
565 nfinished = 0
566 results = [None] * n
567
568 def _done_callback(i, fut):
569 nonlocal nfinished
570 if outer._state != futures._PENDING:
571 if fut._exception is not None:
572 # Mark exception retrieved.
573 fut.exception()
574 return
575 if fut._state == futures._CANCELLED:
576 res = futures.CancelledError()
577 if not return_exceptions:
578 outer.set_exception(res)
579 return
580 elif fut._exception is not None:
581 res = fut.exception() # Mark exception retrieved.
582 if not return_exceptions:
583 outer.set_exception(res)
584 return
585 else:
586 res = fut._result
587 results[i] = res
588 nfinished += 1
589 if nfinished == n:
590 outer.set_result(results)
591
592 for i, fut in enumerate(children):
593 fut.add_done_callback(functools.partial(_done_callback, i))
594 return outer
595
596
597def shield(arg, *, loop=None):
598 """Wait for a future, shielding it from cancellation.
599
600 The statement
601
602 res = yield from shield(something())
603
604 is exactly equivalent to the statement
605
606 res = yield from something()
607
608 *except* that if the coroutine containing it is cancelled, the
609 task running in something() is not cancelled. From the POV of
610 something(), the cancellation did not happen. But its caller is
611 still cancelled, so the yield-from expression still raises
612 CancelledError. Note: If something() is cancelled by other means
613 this will still cancel shield().
614
615 If you want to completely ignore cancellation (not recommended)
616 you can combine shield() with a try/except clause, as follows:
617
618 try:
619 res = yield from shield(something())
620 except CancelledError:
621 res = None
622 """
623 inner = async(arg, loop=loop)
624 if inner.done():
625 # Shortcut.
626 return inner
627 loop = inner._loop
628 outer = futures.Future(loop=loop)
629
630 def _done_callback(inner):
631 if outer.cancelled():
632 # Mark inner's result as retrieved.
633 inner.cancelled() or inner.exception()
634 return
635 if inner.cancelled():
636 outer.cancel()
637 else:
638 exc = inner.exception()
639 if exc is not None:
640 outer.set_exception(exc)
641 else:
642 outer.set_result(inner.result())
643
644 inner.add_done_callback(_done_callback)
645 return outer