blob: 5b8f3eb423ad6d6ffe25995678a219307973baf5 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Victor Stinnerf951d282014-06-29 00:46:45 +02003__all__ = ['Task',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
5 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
Guido van Rossumde3a1362013-11-29 09:29:00 -08006 'gather', 'shield',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007 ]
8
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009import concurrent.futures
10import functools
11import inspect
12import linecache
Victor Stinner7ef60cd2014-02-19 23:15:02 +010013import sys
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014import traceback
15import weakref
16
Victor Stinnerf951d282014-06-29 00:46:45 +020017from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import events
19from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020020from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070021from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
Victor Stinnera02f81f2014-06-24 22:37:53 +020023_PY34 = (sys.version_info >= (3, 4))
Victor Stinner8d3e02e2014-06-18 01:14:59 +020024_PY35 = (sys.version_info >= (3, 5))
25
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027class Task(futures.Future):
28 """A coroutine wrapped in a Future."""
29
30 # An important invariant maintained while a Task not done:
31 #
32 # - Either _fut_waiter is None, and _step() is scheduled;
33 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
34 #
35 # The only transition from the latter to the former is through
36 # _wakeup(). When _fut_waiter is not None, one of its callbacks
37 # must be _wakeup().
38
39 # Weak set containing all tasks alive.
40 _all_tasks = weakref.WeakSet()
41
Guido van Rossum1a605ed2013-12-06 12:57:40 -080042 # Dictionary containing tasks that are currently active in
43 # all running event loops. {EventLoop: Task}
44 _current_tasks = {}
45
46 @classmethod
47 def current_task(cls, loop=None):
48 """Return the currently running task in an event loop or None.
49
50 By default the current task for the current event loop is returned.
51
52 None is returned when called not in the context of a Task.
53 """
54 if loop is None:
55 loop = events.get_event_loop()
56 return cls._current_tasks.get(loop)
57
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070058 @classmethod
59 def all_tasks(cls, loop=None):
60 """Return a set of all tasks for an event loop.
61
62 By default all tasks for the current event loop are returned.
63 """
64 if loop is None:
65 loop = events.get_event_loop()
66 return {t for t in cls._all_tasks if t._loop is loop}
67
68 def __init__(self, coro, *, loop=None):
Victor Stinnerf951d282014-06-29 00:46:45 +020069 assert coroutines.iscoroutine(coro), repr(coro) # Not a coroutine function!
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070070 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +020071 if self._source_traceback:
72 del self._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070073 self._coro = iter(coro) # Use the iterator just in case.
74 self._fut_waiter = None
75 self._must_cancel = False
76 self._loop.call_soon(self._step)
77 self.__class__._all_tasks.add(self)
78
Victor Stinnera02f81f2014-06-24 22:37:53 +020079 # On Python 3.3 or older, objects with a destructor part of a reference
80 # cycle are never destroyed. It's not more the case on Python 3.4 thanks to
81 # the PEP 442.
82 if _PY34:
83 def __del__(self):
84 if self._state == futures._PENDING:
Victor Stinner80f53aa2014-06-27 13:52:20 +020085 context = {
Victor Stinnera02f81f2014-06-24 22:37:53 +020086 'task': self,
87 'message': 'Task was destroyed but it is pending!',
Victor Stinner80f53aa2014-06-27 13:52:20 +020088 }
89 if self._source_traceback:
90 context['source_traceback'] = self._source_traceback
91 self._loop.call_exception_handler(context)
Victor Stinnera02f81f2014-06-24 22:37:53 +020092 futures.Future.__del__(self)
93
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070094 def __repr__(self):
Victor Stinner975735f2014-06-25 21:41:58 +020095 info = []
96 if self._must_cancel:
97 info.append('cancelling')
98 else:
99 info.append(self._state.lower())
100
Victor Stinnerf951d282014-06-29 00:46:45 +0200101 info.append(coroutines._format_coroutine(self._coro))
Victor Stinner975735f2014-06-25 21:41:58 +0200102
103 if self._state == futures._FINISHED:
104 info.append(self._format_result())
105
106 if self._callbacks:
107 info.append(self._format_callbacks())
108
109 return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700110
111 def get_stack(self, *, limit=None):
112 """Return the list of stack frames for this task's coroutine.
113
114 If the coroutine is active, this returns the stack where it is
115 suspended. If the coroutine has completed successfully or was
116 cancelled, this returns an empty list. If the coroutine was
117 terminated by an exception, this returns the list of traceback
118 frames.
119
120 The frames are always ordered from oldest to newest.
121
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500122 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700123 return; by default all available frames are returned. Its
124 meaning differs depending on whether a stack or a traceback is
125 returned: the newest frames of a stack are returned, but the
126 oldest frames of a traceback are returned. (This matches the
127 behavior of the traceback module.)
128
129 For reasons beyond our control, only one stack frame is
130 returned for a suspended coroutine.
131 """
132 frames = []
133 f = self._coro.gi_frame
134 if f is not None:
135 while f is not None:
136 if limit is not None:
137 if limit <= 0:
138 break
139 limit -= 1
140 frames.append(f)
141 f = f.f_back
142 frames.reverse()
143 elif self._exception is not None:
144 tb = self._exception.__traceback__
145 while tb is not None:
146 if limit is not None:
147 if limit <= 0:
148 break
149 limit -= 1
150 frames.append(tb.tb_frame)
151 tb = tb.tb_next
152 return frames
153
154 def print_stack(self, *, limit=None, file=None):
155 """Print the stack or traceback for this task's coroutine.
156
157 This produces output similar to that of the traceback module,
158 for the frames retrieved by get_stack(). The limit argument
159 is passed to get_stack(). The file argument is an I/O stream
160 to which the output goes; by default it goes to sys.stderr.
161 """
162 extracted_list = []
163 checked = set()
164 for f in self.get_stack(limit=limit):
165 lineno = f.f_lineno
166 co = f.f_code
167 filename = co.co_filename
168 name = co.co_name
169 if filename not in checked:
170 checked.add(filename)
171 linecache.checkcache(filename)
172 line = linecache.getline(filename, lineno, f.f_globals)
173 extracted_list.append((filename, lineno, name, line))
174 exc = self._exception
175 if not extracted_list:
176 print('No stack for %r' % self, file=file)
177 elif exc is not None:
178 print('Traceback for %r (most recent call last):' % self,
179 file=file)
180 else:
181 print('Stack for %r (most recent call last):' % self,
182 file=file)
183 traceback.print_list(extracted_list, file=file)
184 if exc is not None:
185 for line in traceback.format_exception_only(exc.__class__, exc):
186 print(line, file=file, end='')
187
188 def cancel(self):
Victor Stinner8d213572014-06-02 23:06:46 +0200189 """Request this task to cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200190
Victor Stinner8d213572014-06-02 23:06:46 +0200191 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200192 wrapped coroutine on the next cycle through the event loop.
193 The coroutine then has a chance to clean up or even deny
194 the request using try/except/finally.
195
196 Contrary to Future.cancel(), this does not guarantee that the
197 task will be cancelled: the exception might be caught and
198 acted upon, delaying cancellation of the task or preventing it
199 completely. The task may also return a value or raise a
200 different exception.
201
202 Immediately after this method is called, Task.cancelled() will
203 not return True (unless the task was already cancelled). A
204 task will be marked as cancelled when the wrapped coroutine
205 terminates with a CancelledError exception (even if cancel()
206 was not called).
207 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700208 if self.done():
209 return False
210 if self._fut_waiter is not None:
211 if self._fut_waiter.cancel():
212 # Leave self._fut_waiter; it may be a Task that
213 # catches and ignores the cancellation so we may have
214 # to cancel it again later.
215 return True
216 # It must be the case that self._step is already scheduled.
217 self._must_cancel = True
218 return True
219
220 def _step(self, value=None, exc=None):
221 assert not self.done(), \
222 '_step(): already done: {!r}, {!r}, {!r}'.format(self, value, exc)
223 if self._must_cancel:
224 if not isinstance(exc, futures.CancelledError):
225 exc = futures.CancelledError()
226 self._must_cancel = False
227 coro = self._coro
228 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800229
230 self.__class__._current_tasks[self._loop] = self
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700231 # Call either coro.throw(exc) or coro.send(value).
232 try:
233 if exc is not None:
234 result = coro.throw(exc)
235 elif value is not None:
236 result = coro.send(value)
237 else:
238 result = next(coro)
239 except StopIteration as exc:
240 self.set_result(exc.value)
241 except futures.CancelledError as exc:
242 super().cancel() # I.e., Future.cancel(self).
243 except Exception as exc:
244 self.set_exception(exc)
245 except BaseException as exc:
246 self.set_exception(exc)
247 raise
248 else:
249 if isinstance(result, futures.Future):
250 # Yielded Future must come from Future.__iter__().
251 if result._blocking:
252 result._blocking = False
253 result.add_done_callback(self._wakeup)
254 self._fut_waiter = result
255 if self._must_cancel:
256 if self._fut_waiter.cancel():
257 self._must_cancel = False
258 else:
259 self._loop.call_soon(
260 self._step, None,
261 RuntimeError(
262 'yield was used instead of yield from '
263 'in task {!r} with {!r}'.format(self, result)))
264 elif result is None:
265 # Bare yield relinquishes control for one event loop iteration.
266 self._loop.call_soon(self._step)
267 elif inspect.isgenerator(result):
268 # Yielding a generator is just wrong.
269 self._loop.call_soon(
270 self._step, None,
271 RuntimeError(
272 'yield was used instead of yield from for '
273 'generator in task {!r} with {}'.format(
274 self, result)))
275 else:
276 # Yielding something else is an error.
277 self._loop.call_soon(
278 self._step, None,
279 RuntimeError(
280 'Task got bad yield: {!r}'.format(result)))
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800281 finally:
282 self.__class__._current_tasks.pop(self._loop)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100283 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700284
285 def _wakeup(self, future):
286 try:
287 value = future.result()
288 except Exception as exc:
289 # This may also be a cancellation.
290 self._step(None, exc)
291 else:
292 self._step(value, None)
293 self = None # Needed to break cycles when an exception occurs.
294
295
296# wait() and as_completed() similar to those in PEP 3148.
297
298FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
299FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
300ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
301
302
303@coroutine
304def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
305 """Wait for the Futures and coroutines given by fs to complete.
306
Victor Stinnerdb74d982014-06-10 11:16:05 +0200307 The sequence futures must not be empty.
308
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700309 Coroutines will be wrapped in Tasks.
310
311 Returns two sets of Future: (done, pending).
312
313 Usage:
314
315 done, pending = yield from asyncio.wait(fs)
316
317 Note: This does not raise TimeoutError! Futures that aren't done
318 when the timeout occurs are returned in the second set.
319 """
Victor Stinnerf951d282014-06-29 00:46:45 +0200320 if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100321 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700322 if not fs:
323 raise ValueError('Set of coroutines/Futures is empty.')
324
325 if loop is None:
326 loop = events.get_event_loop()
327
Yury Selivanov622be342014-02-06 22:06:16 -0500328 fs = {async(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700329
330 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
331 raise ValueError('Invalid return_when value: {}'.format(return_when))
332 return (yield from _wait(fs, timeout, return_when, loop))
333
334
335def _release_waiter(waiter, value=True, *args):
336 if not waiter.done():
337 waiter.set_result(value)
338
339
340@coroutine
341def wait_for(fut, timeout, *, loop=None):
342 """Wait for the single Future or coroutine to complete, with timeout.
343
344 Coroutine will be wrapped in Task.
345
Victor Stinner421e49b2014-01-23 17:40:59 +0100346 Returns result of the Future or coroutine. When a timeout occurs,
347 it cancels the task and raises TimeoutError. To avoid the task
348 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700349
350 Usage:
351
352 result = yield from asyncio.wait_for(fut, 10.0)
353
354 """
355 if loop is None:
356 loop = events.get_event_loop()
357
Guido van Rossum48c66c32014-01-29 14:30:38 -0800358 if timeout is None:
359 return (yield from fut)
360
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700361 waiter = futures.Future(loop=loop)
362 timeout_handle = loop.call_later(timeout, _release_waiter, waiter, False)
363 cb = functools.partial(_release_waiter, waiter, True)
364
365 fut = async(fut, loop=loop)
366 fut.add_done_callback(cb)
367
368 try:
369 if (yield from waiter):
370 return fut.result()
371 else:
372 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100373 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700374 raise futures.TimeoutError()
375 finally:
376 timeout_handle.cancel()
377
378
379@coroutine
380def _wait(fs, timeout, return_when, loop):
381 """Internal helper for wait() and _wait_for().
382
383 The fs argument must be a collection of Futures.
384 """
385 assert fs, 'Set of Futures is empty.'
386 waiter = futures.Future(loop=loop)
387 timeout_handle = None
388 if timeout is not None:
389 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
390 counter = len(fs)
391
392 def _on_completion(f):
393 nonlocal counter
394 counter -= 1
395 if (counter <= 0 or
396 return_when == FIRST_COMPLETED or
397 return_when == FIRST_EXCEPTION and (not f.cancelled() and
398 f.exception() is not None)):
399 if timeout_handle is not None:
400 timeout_handle.cancel()
401 if not waiter.done():
402 waiter.set_result(False)
403
404 for f in fs:
405 f.add_done_callback(_on_completion)
406
407 try:
408 yield from waiter
409 finally:
410 if timeout_handle is not None:
411 timeout_handle.cancel()
412
413 done, pending = set(), set()
414 for f in fs:
415 f.remove_done_callback(_on_completion)
416 if f.done():
417 done.add(f)
418 else:
419 pending.add(f)
420 return done, pending
421
422
423# This is *not* a @coroutine! It is just an iterator (yielding Futures).
424def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800425 """Return an iterator whose values are coroutines.
426
427 When waiting for the yielded coroutines you'll get the results (or
428 exceptions!) of the original Futures (or coroutines), in the order
429 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700430
431 This differs from PEP 3148; the proper way to use this is:
432
433 for f in as_completed(fs):
434 result = yield from f # The 'yield from' may raise.
435 # Use result.
436
Guido van Rossumb58f0532014-02-12 17:58:19 -0800437 If a timeout is specified, the 'yield from' will raise
438 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700439
440 Note: The futures 'f' are not necessarily members of fs.
441 """
Victor Stinnerf951d282014-06-29 00:46:45 +0200442 if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100443 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700444 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov622be342014-02-06 22:06:16 -0500445 todo = {async(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800446 from .queues import Queue # Import here to avoid circular import problem.
447 done = Queue(loop=loop)
448 timeout_handle = None
449
450 def _on_timeout():
451 for f in todo:
452 f.remove_done_callback(_on_completion)
453 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
454 todo.clear() # Can't do todo.remove(f) in the loop.
455
456 def _on_completion(f):
457 if not todo:
458 return # _on_timeout() was here first.
459 todo.remove(f)
460 done.put_nowait(f)
461 if not todo and timeout_handle is not None:
462 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700463
464 @coroutine
465 def _wait_for_one():
Guido van Rossumb58f0532014-02-12 17:58:19 -0800466 f = yield from done.get()
467 if f is None:
468 # Dummy value from _on_timeout().
469 raise futures.TimeoutError
470 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700471
Guido van Rossumb58f0532014-02-12 17:58:19 -0800472 for f in todo:
473 f.add_done_callback(_on_completion)
474 if todo and timeout is not None:
475 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700476 for _ in range(len(todo)):
477 yield _wait_for_one()
478
479
480@coroutine
481def sleep(delay, result=None, *, loop=None):
482 """Coroutine that completes after a given time (in seconds)."""
483 future = futures.Future(loop=loop)
484 h = future._loop.call_later(delay, future.set_result, result)
485 try:
486 return (yield from future)
487 finally:
488 h.cancel()
489
490
491def async(coro_or_future, *, loop=None):
492 """Wrap a coroutine in a future.
493
494 If the argument is a Future, it is returned directly.
495 """
496 if isinstance(coro_or_future, futures.Future):
497 if loop is not None and loop is not coro_or_future._loop:
498 raise ValueError('loop argument must agree with Future')
499 return coro_or_future
Victor Stinnerf951d282014-06-29 00:46:45 +0200500 elif coroutines.iscoroutine(coro_or_future):
Victor Stinner80f53aa2014-06-27 13:52:20 +0200501 task = Task(coro_or_future, loop=loop)
502 if task._source_traceback:
503 del task._source_traceback[-1]
504 return task
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700505 else:
506 raise TypeError('A Future or coroutine is required')
507
508
509class _GatheringFuture(futures.Future):
510 """Helper for gather().
511
512 This overrides cancel() to cancel all the children and act more
513 like Task.cancel(), which doesn't immediately mark itself as
514 cancelled.
515 """
516
517 def __init__(self, children, *, loop=None):
518 super().__init__(loop=loop)
519 self._children = children
520
521 def cancel(self):
522 if self.done():
523 return False
524 for child in self._children:
525 child.cancel()
526 return True
527
528
529def gather(*coros_or_futures, loop=None, return_exceptions=False):
530 """Return a future aggregating results from the given coroutines
531 or futures.
532
533 All futures must share the same event loop. If all the tasks are
534 done successfully, the returned future's result is the list of
535 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500536 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700537 exceptions in the tasks are treated the same as successful
538 results, and gathered in the result list; otherwise, the first
539 raised exception will be immediately propagated to the returned
540 future.
541
542 Cancellation: if the outer Future is cancelled, all children (that
543 have not completed yet) are also cancelled. If any child is
544 cancelled, this is treated as if it raised CancelledError --
545 the outer Future is *not* cancelled in this case. (This is to
546 prevent the cancellation of one child to cause other children to
547 be cancelled.)
548 """
Yury Selivanov622be342014-02-06 22:06:16 -0500549 arg_to_fut = {arg: async(arg, loop=loop) for arg in set(coros_or_futures)}
550 children = [arg_to_fut[arg] for arg in coros_or_futures]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700551 n = len(children)
552 if n == 0:
553 outer = futures.Future(loop=loop)
554 outer.set_result([])
555 return outer
556 if loop is None:
557 loop = children[0]._loop
558 for fut in children:
559 if fut._loop is not loop:
560 raise ValueError("futures are tied to different event loops")
561 outer = _GatheringFuture(children, loop=loop)
562 nfinished = 0
563 results = [None] * n
564
565 def _done_callback(i, fut):
566 nonlocal nfinished
567 if outer._state != futures._PENDING:
568 if fut._exception is not None:
569 # Mark exception retrieved.
570 fut.exception()
571 return
572 if fut._state == futures._CANCELLED:
573 res = futures.CancelledError()
574 if not return_exceptions:
575 outer.set_exception(res)
576 return
577 elif fut._exception is not None:
578 res = fut.exception() # Mark exception retrieved.
579 if not return_exceptions:
580 outer.set_exception(res)
581 return
582 else:
583 res = fut._result
584 results[i] = res
585 nfinished += 1
586 if nfinished == n:
587 outer.set_result(results)
588
589 for i, fut in enumerate(children):
590 fut.add_done_callback(functools.partial(_done_callback, i))
591 return outer
592
593
594def shield(arg, *, loop=None):
595 """Wait for a future, shielding it from cancellation.
596
597 The statement
598
599 res = yield from shield(something())
600
601 is exactly equivalent to the statement
602
603 res = yield from something()
604
605 *except* that if the coroutine containing it is cancelled, the
606 task running in something() is not cancelled. From the POV of
607 something(), the cancellation did not happen. But its caller is
608 still cancelled, so the yield-from expression still raises
609 CancelledError. Note: If something() is cancelled by other means
610 this will still cancel shield().
611
612 If you want to completely ignore cancellation (not recommended)
613 you can combine shield() with a try/except clause, as follows:
614
615 try:
616 res = yield from shield(something())
617 except CancelledError:
618 res = None
619 """
620 inner = async(arg, loop=loop)
621 if inner.done():
622 # Shortcut.
623 return inner
624 loop = inner._loop
625 outer = futures.Future(loop=loop)
626
627 def _done_callback(inner):
628 if outer.cancelled():
629 # Mark inner's result as retrieved.
630 inner.cancelled() or inner.exception()
631 return
632 if inner.cancelled():
633 outer.cancel()
634 else:
635 exc = inner.exception()
636 if exc is not None:
637 outer.set_exception(exc)
638 else:
639 outer.set_result(inner.result())
640
641 inner.add_done_callback(_done_callback)
642 return outer