blob: 3d7e5a433344ae3602c1ba7e99506bac4a9cf911 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Victor Stinnerf951d282014-06-29 00:46:45 +02003__all__ = ['Task',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
5 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
Guido van Rossumde3a1362013-11-29 09:29:00 -08006 'gather', 'shield',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007 ]
8
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009import concurrent.futures
10import functools
11import inspect
12import linecache
Victor Stinner7ef60cd2014-02-19 23:15:02 +010013import sys
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014import traceback
15import weakref
16
Victor Stinnerf951d282014-06-29 00:46:45 +020017from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import events
19from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020020from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070021from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
Victor Stinnera02f81f2014-06-24 22:37:53 +020023_PY34 = (sys.version_info >= (3, 4))
Victor Stinner8d3e02e2014-06-18 01:14:59 +020024
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026class Task(futures.Future):
27 """A coroutine wrapped in a Future."""
28
29 # An important invariant maintained while a Task not done:
30 #
31 # - Either _fut_waiter is None, and _step() is scheduled;
32 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
33 #
34 # The only transition from the latter to the former is through
35 # _wakeup(). When _fut_waiter is not None, one of its callbacks
36 # must be _wakeup().
37
38 # Weak set containing all tasks alive.
39 _all_tasks = weakref.WeakSet()
40
Guido van Rossum1a605ed2013-12-06 12:57:40 -080041 # Dictionary containing tasks that are currently active in
42 # all running event loops. {EventLoop: Task}
43 _current_tasks = {}
44
45 @classmethod
46 def current_task(cls, loop=None):
47 """Return the currently running task in an event loop or None.
48
49 By default the current task for the current event loop is returned.
50
51 None is returned when called not in the context of a Task.
52 """
53 if loop is None:
54 loop = events.get_event_loop()
55 return cls._current_tasks.get(loop)
56
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070057 @classmethod
58 def all_tasks(cls, loop=None):
59 """Return a set of all tasks for an event loop.
60
61 By default all tasks for the current event loop are returned.
62 """
63 if loop is None:
64 loop = events.get_event_loop()
65 return {t for t in cls._all_tasks if t._loop is loop}
66
67 def __init__(self, coro, *, loop=None):
Victor Stinnerf951d282014-06-29 00:46:45 +020068 assert coroutines.iscoroutine(coro), repr(coro) # Not a coroutine function!
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070069 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +020070 if self._source_traceback:
71 del self._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070072 self._coro = iter(coro) # Use the iterator just in case.
73 self._fut_waiter = None
74 self._must_cancel = False
75 self._loop.call_soon(self._step)
76 self.__class__._all_tasks.add(self)
Victor Stinner98b63912014-06-30 14:51:04 +020077 # If False, don't log a message if the task is destroyed whereas its
78 # status is still pending
79 self._log_destroy_pending = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070080
Victor Stinnera02f81f2014-06-24 22:37:53 +020081 # On Python 3.3 or older, objects with a destructor part of a reference
82 # cycle are never destroyed. It's not more the case on Python 3.4 thanks to
83 # the PEP 442.
84 if _PY34:
85 def __del__(self):
Victor Stinner98b63912014-06-30 14:51:04 +020086 if self._state == futures._PENDING and self._log_destroy_pending:
Victor Stinner80f53aa2014-06-27 13:52:20 +020087 context = {
Victor Stinnera02f81f2014-06-24 22:37:53 +020088 'task': self,
89 'message': 'Task was destroyed but it is pending!',
Victor Stinner80f53aa2014-06-27 13:52:20 +020090 }
91 if self._source_traceback:
92 context['source_traceback'] = self._source_traceback
93 self._loop.call_exception_handler(context)
Victor Stinnera02f81f2014-06-24 22:37:53 +020094 futures.Future.__del__(self)
95
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070096 def __repr__(self):
Victor Stinner975735f2014-06-25 21:41:58 +020097 info = []
98 if self._must_cancel:
99 info.append('cancelling')
100 else:
101 info.append(self._state.lower())
102
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200103 coro = coroutines._format_coroutine(self._coro)
104 info.append('coro=<%s>' % coro)
105
106 if self._source_traceback:
107 frame = self._source_traceback[-1]
108 info.append('created at %s:%s' % (frame[0], frame[1]))
Victor Stinner975735f2014-06-25 21:41:58 +0200109
110 if self._state == futures._FINISHED:
111 info.append(self._format_result())
112
113 if self._callbacks:
114 info.append(self._format_callbacks())
115
Victor Stinner2dba23a2014-07-03 00:59:00 +0200116 if self._fut_waiter is not None:
117 info.append('wait_for=%r' % self._fut_waiter)
118
Victor Stinner975735f2014-06-25 21:41:58 +0200119 return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700120
121 def get_stack(self, *, limit=None):
122 """Return the list of stack frames for this task's coroutine.
123
124 If the coroutine is active, this returns the stack where it is
125 suspended. If the coroutine has completed successfully or was
126 cancelled, this returns an empty list. If the coroutine was
127 terminated by an exception, this returns the list of traceback
128 frames.
129
130 The frames are always ordered from oldest to newest.
131
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500132 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700133 return; by default all available frames are returned. Its
134 meaning differs depending on whether a stack or a traceback is
135 returned: the newest frames of a stack are returned, but the
136 oldest frames of a traceback are returned. (This matches the
137 behavior of the traceback module.)
138
139 For reasons beyond our control, only one stack frame is
140 returned for a suspended coroutine.
141 """
142 frames = []
143 f = self._coro.gi_frame
144 if f is not None:
145 while f is not None:
146 if limit is not None:
147 if limit <= 0:
148 break
149 limit -= 1
150 frames.append(f)
151 f = f.f_back
152 frames.reverse()
153 elif self._exception is not None:
154 tb = self._exception.__traceback__
155 while tb is not None:
156 if limit is not None:
157 if limit <= 0:
158 break
159 limit -= 1
160 frames.append(tb.tb_frame)
161 tb = tb.tb_next
162 return frames
163
164 def print_stack(self, *, limit=None, file=None):
165 """Print the stack or traceback for this task's coroutine.
166
167 This produces output similar to that of the traceback module,
168 for the frames retrieved by get_stack(). The limit argument
169 is passed to get_stack(). The file argument is an I/O stream
170 to which the output goes; by default it goes to sys.stderr.
171 """
172 extracted_list = []
173 checked = set()
174 for f in self.get_stack(limit=limit):
175 lineno = f.f_lineno
176 co = f.f_code
177 filename = co.co_filename
178 name = co.co_name
179 if filename not in checked:
180 checked.add(filename)
181 linecache.checkcache(filename)
182 line = linecache.getline(filename, lineno, f.f_globals)
183 extracted_list.append((filename, lineno, name, line))
184 exc = self._exception
185 if not extracted_list:
186 print('No stack for %r' % self, file=file)
187 elif exc is not None:
188 print('Traceback for %r (most recent call last):' % self,
189 file=file)
190 else:
191 print('Stack for %r (most recent call last):' % self,
192 file=file)
193 traceback.print_list(extracted_list, file=file)
194 if exc is not None:
195 for line in traceback.format_exception_only(exc.__class__, exc):
196 print(line, file=file, end='')
197
198 def cancel(self):
Victor Stinner8d213572014-06-02 23:06:46 +0200199 """Request this task to cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200200
Victor Stinner8d213572014-06-02 23:06:46 +0200201 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200202 wrapped coroutine on the next cycle through the event loop.
203 The coroutine then has a chance to clean up or even deny
204 the request using try/except/finally.
205
206 Contrary to Future.cancel(), this does not guarantee that the
207 task will be cancelled: the exception might be caught and
208 acted upon, delaying cancellation of the task or preventing it
209 completely. The task may also return a value or raise a
210 different exception.
211
212 Immediately after this method is called, Task.cancelled() will
213 not return True (unless the task was already cancelled). A
214 task will be marked as cancelled when the wrapped coroutine
215 terminates with a CancelledError exception (even if cancel()
216 was not called).
217 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700218 if self.done():
219 return False
220 if self._fut_waiter is not None:
221 if self._fut_waiter.cancel():
222 # Leave self._fut_waiter; it may be a Task that
223 # catches and ignores the cancellation so we may have
224 # to cancel it again later.
225 return True
226 # It must be the case that self._step is already scheduled.
227 self._must_cancel = True
228 return True
229
230 def _step(self, value=None, exc=None):
231 assert not self.done(), \
232 '_step(): already done: {!r}, {!r}, {!r}'.format(self, value, exc)
233 if self._must_cancel:
234 if not isinstance(exc, futures.CancelledError):
235 exc = futures.CancelledError()
236 self._must_cancel = False
237 coro = self._coro
238 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800239
240 self.__class__._current_tasks[self._loop] = self
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700241 # Call either coro.throw(exc) or coro.send(value).
242 try:
243 if exc is not None:
244 result = coro.throw(exc)
245 elif value is not None:
246 result = coro.send(value)
247 else:
248 result = next(coro)
249 except StopIteration as exc:
250 self.set_result(exc.value)
251 except futures.CancelledError as exc:
252 super().cancel() # I.e., Future.cancel(self).
253 except Exception as exc:
254 self.set_exception(exc)
255 except BaseException as exc:
256 self.set_exception(exc)
257 raise
258 else:
259 if isinstance(result, futures.Future):
260 # Yielded Future must come from Future.__iter__().
261 if result._blocking:
262 result._blocking = False
263 result.add_done_callback(self._wakeup)
264 self._fut_waiter = result
265 if self._must_cancel:
266 if self._fut_waiter.cancel():
267 self._must_cancel = False
268 else:
269 self._loop.call_soon(
270 self._step, None,
271 RuntimeError(
272 'yield was used instead of yield from '
273 'in task {!r} with {!r}'.format(self, result)))
274 elif result is None:
275 # Bare yield relinquishes control for one event loop iteration.
276 self._loop.call_soon(self._step)
277 elif inspect.isgenerator(result):
278 # Yielding a generator is just wrong.
279 self._loop.call_soon(
280 self._step, None,
281 RuntimeError(
282 'yield was used instead of yield from for '
283 'generator in task {!r} with {}'.format(
284 self, result)))
285 else:
286 # Yielding something else is an error.
287 self._loop.call_soon(
288 self._step, None,
289 RuntimeError(
290 'Task got bad yield: {!r}'.format(result)))
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800291 finally:
292 self.__class__._current_tasks.pop(self._loop)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100293 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700294
295 def _wakeup(self, future):
296 try:
297 value = future.result()
298 except Exception as exc:
299 # This may also be a cancellation.
300 self._step(None, exc)
301 else:
302 self._step(value, None)
303 self = None # Needed to break cycles when an exception occurs.
304
305
306# wait() and as_completed() similar to those in PEP 3148.
307
308FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
309FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
310ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
311
312
313@coroutine
314def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
315 """Wait for the Futures and coroutines given by fs to complete.
316
Victor Stinnerdb74d982014-06-10 11:16:05 +0200317 The sequence futures must not be empty.
318
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700319 Coroutines will be wrapped in Tasks.
320
321 Returns two sets of Future: (done, pending).
322
323 Usage:
324
325 done, pending = yield from asyncio.wait(fs)
326
327 Note: This does not raise TimeoutError! Futures that aren't done
328 when the timeout occurs are returned in the second set.
329 """
Victor Stinnerf951d282014-06-29 00:46:45 +0200330 if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100331 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700332 if not fs:
333 raise ValueError('Set of coroutines/Futures is empty.')
334
335 if loop is None:
336 loop = events.get_event_loop()
337
Yury Selivanov622be342014-02-06 22:06:16 -0500338 fs = {async(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700339
340 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
341 raise ValueError('Invalid return_when value: {}'.format(return_when))
342 return (yield from _wait(fs, timeout, return_when, loop))
343
344
345def _release_waiter(waiter, value=True, *args):
346 if not waiter.done():
347 waiter.set_result(value)
348
349
350@coroutine
351def wait_for(fut, timeout, *, loop=None):
352 """Wait for the single Future or coroutine to complete, with timeout.
353
354 Coroutine will be wrapped in Task.
355
Victor Stinner421e49b2014-01-23 17:40:59 +0100356 Returns result of the Future or coroutine. When a timeout occurs,
357 it cancels the task and raises TimeoutError. To avoid the task
358 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700359
360 Usage:
361
362 result = yield from asyncio.wait_for(fut, 10.0)
363
364 """
365 if loop is None:
366 loop = events.get_event_loop()
367
Guido van Rossum48c66c32014-01-29 14:30:38 -0800368 if timeout is None:
369 return (yield from fut)
370
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700371 waiter = futures.Future(loop=loop)
372 timeout_handle = loop.call_later(timeout, _release_waiter, waiter, False)
373 cb = functools.partial(_release_waiter, waiter, True)
374
375 fut = async(fut, loop=loop)
376 fut.add_done_callback(cb)
377
378 try:
379 if (yield from waiter):
380 return fut.result()
381 else:
382 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100383 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700384 raise futures.TimeoutError()
385 finally:
386 timeout_handle.cancel()
387
388
389@coroutine
390def _wait(fs, timeout, return_when, loop):
391 """Internal helper for wait() and _wait_for().
392
393 The fs argument must be a collection of Futures.
394 """
395 assert fs, 'Set of Futures is empty.'
396 waiter = futures.Future(loop=loop)
397 timeout_handle = None
398 if timeout is not None:
399 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
400 counter = len(fs)
401
402 def _on_completion(f):
403 nonlocal counter
404 counter -= 1
405 if (counter <= 0 or
406 return_when == FIRST_COMPLETED or
407 return_when == FIRST_EXCEPTION and (not f.cancelled() and
408 f.exception() is not None)):
409 if timeout_handle is not None:
410 timeout_handle.cancel()
411 if not waiter.done():
412 waiter.set_result(False)
413
414 for f in fs:
415 f.add_done_callback(_on_completion)
416
417 try:
418 yield from waiter
419 finally:
420 if timeout_handle is not None:
421 timeout_handle.cancel()
422
423 done, pending = set(), set()
424 for f in fs:
425 f.remove_done_callback(_on_completion)
426 if f.done():
427 done.add(f)
428 else:
429 pending.add(f)
430 return done, pending
431
432
433# This is *not* a @coroutine! It is just an iterator (yielding Futures).
434def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800435 """Return an iterator whose values are coroutines.
436
437 When waiting for the yielded coroutines you'll get the results (or
438 exceptions!) of the original Futures (or coroutines), in the order
439 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700440
441 This differs from PEP 3148; the proper way to use this is:
442
443 for f in as_completed(fs):
444 result = yield from f # The 'yield from' may raise.
445 # Use result.
446
Guido van Rossumb58f0532014-02-12 17:58:19 -0800447 If a timeout is specified, the 'yield from' will raise
448 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700449
450 Note: The futures 'f' are not necessarily members of fs.
451 """
Victor Stinnerf951d282014-06-29 00:46:45 +0200452 if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100453 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700454 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov622be342014-02-06 22:06:16 -0500455 todo = {async(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800456 from .queues import Queue # Import here to avoid circular import problem.
457 done = Queue(loop=loop)
458 timeout_handle = None
459
460 def _on_timeout():
461 for f in todo:
462 f.remove_done_callback(_on_completion)
463 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
464 todo.clear() # Can't do todo.remove(f) in the loop.
465
466 def _on_completion(f):
467 if not todo:
468 return # _on_timeout() was here first.
469 todo.remove(f)
470 done.put_nowait(f)
471 if not todo and timeout_handle is not None:
472 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700473
474 @coroutine
475 def _wait_for_one():
Guido van Rossumb58f0532014-02-12 17:58:19 -0800476 f = yield from done.get()
477 if f is None:
478 # Dummy value from _on_timeout().
479 raise futures.TimeoutError
480 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700481
Guido van Rossumb58f0532014-02-12 17:58:19 -0800482 for f in todo:
483 f.add_done_callback(_on_completion)
484 if todo and timeout is not None:
485 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700486 for _ in range(len(todo)):
487 yield _wait_for_one()
488
489
490@coroutine
491def sleep(delay, result=None, *, loop=None):
492 """Coroutine that completes after a given time (in seconds)."""
493 future = futures.Future(loop=loop)
Victor Stinnera9acbe82014-07-05 15:29:41 +0200494 h = future._loop.call_later(delay,
495 future._set_result_unless_cancelled, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700496 try:
497 return (yield from future)
498 finally:
499 h.cancel()
500
501
502def async(coro_or_future, *, loop=None):
503 """Wrap a coroutine in a future.
504
505 If the argument is a Future, it is returned directly.
506 """
507 if isinstance(coro_or_future, futures.Future):
508 if loop is not None and loop is not coro_or_future._loop:
509 raise ValueError('loop argument must agree with Future')
510 return coro_or_future
Victor Stinnerf951d282014-06-29 00:46:45 +0200511 elif coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200512 if loop is None:
513 loop = events.get_event_loop()
514 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200515 if task._source_traceback:
516 del task._source_traceback[-1]
517 return task
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700518 else:
519 raise TypeError('A Future or coroutine is required')
520
521
522class _GatheringFuture(futures.Future):
523 """Helper for gather().
524
525 This overrides cancel() to cancel all the children and act more
526 like Task.cancel(), which doesn't immediately mark itself as
527 cancelled.
528 """
529
530 def __init__(self, children, *, loop=None):
531 super().__init__(loop=loop)
532 self._children = children
533
534 def cancel(self):
535 if self.done():
536 return False
537 for child in self._children:
538 child.cancel()
539 return True
540
541
542def gather(*coros_or_futures, loop=None, return_exceptions=False):
543 """Return a future aggregating results from the given coroutines
544 or futures.
545
546 All futures must share the same event loop. If all the tasks are
547 done successfully, the returned future's result is the list of
548 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500549 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700550 exceptions in the tasks are treated the same as successful
551 results, and gathered in the result list; otherwise, the first
552 raised exception will be immediately propagated to the returned
553 future.
554
555 Cancellation: if the outer Future is cancelled, all children (that
556 have not completed yet) are also cancelled. If any child is
557 cancelled, this is treated as if it raised CancelledError --
558 the outer Future is *not* cancelled in this case. (This is to
559 prevent the cancellation of one child to cause other children to
560 be cancelled.)
561 """
Yury Selivanov622be342014-02-06 22:06:16 -0500562 arg_to_fut = {arg: async(arg, loop=loop) for arg in set(coros_or_futures)}
563 children = [arg_to_fut[arg] for arg in coros_or_futures]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700564 n = len(children)
565 if n == 0:
566 outer = futures.Future(loop=loop)
567 outer.set_result([])
568 return outer
569 if loop is None:
570 loop = children[0]._loop
571 for fut in children:
572 if fut._loop is not loop:
573 raise ValueError("futures are tied to different event loops")
574 outer = _GatheringFuture(children, loop=loop)
575 nfinished = 0
576 results = [None] * n
577
578 def _done_callback(i, fut):
579 nonlocal nfinished
580 if outer._state != futures._PENDING:
581 if fut._exception is not None:
582 # Mark exception retrieved.
583 fut.exception()
584 return
585 if fut._state == futures._CANCELLED:
586 res = futures.CancelledError()
587 if not return_exceptions:
588 outer.set_exception(res)
589 return
590 elif fut._exception is not None:
591 res = fut.exception() # Mark exception retrieved.
592 if not return_exceptions:
593 outer.set_exception(res)
594 return
595 else:
596 res = fut._result
597 results[i] = res
598 nfinished += 1
599 if nfinished == n:
600 outer.set_result(results)
601
602 for i, fut in enumerate(children):
603 fut.add_done_callback(functools.partial(_done_callback, i))
604 return outer
605
606
607def shield(arg, *, loop=None):
608 """Wait for a future, shielding it from cancellation.
609
610 The statement
611
612 res = yield from shield(something())
613
614 is exactly equivalent to the statement
615
616 res = yield from something()
617
618 *except* that if the coroutine containing it is cancelled, the
619 task running in something() is not cancelled. From the POV of
620 something(), the cancellation did not happen. But its caller is
621 still cancelled, so the yield-from expression still raises
622 CancelledError. Note: If something() is cancelled by other means
623 this will still cancel shield().
624
625 If you want to completely ignore cancellation (not recommended)
626 you can combine shield() with a try/except clause, as follows:
627
628 try:
629 res = yield from shield(something())
630 except CancelledError:
631 res = None
632 """
633 inner = async(arg, loop=loop)
634 if inner.done():
635 # Shortcut.
636 return inner
637 loop = inner._loop
638 outer = futures.Future(loop=loop)
639
640 def _done_callback(inner):
641 if outer.cancelled():
642 # Mark inner's result as retrieved.
643 inner.cancelled() or inner.exception()
644 return
645 if inner.cancelled():
646 outer.cancel()
647 else:
648 exc = inner.exception()
649 if exc is not None:
650 outer.set_exception(exc)
651 else:
652 outer.set_result(inner.result())
653
654 inner.add_done_callback(_done_callback)
655 return outer