blob: 07952c9a64f6f9ae896a01081064f5b76866db66 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Victor Stinnerf951d282014-06-29 00:46:45 +02003__all__ = ['Task',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
5 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
Guido van Rossumde3a1362013-11-29 09:29:00 -08006 'gather', 'shield',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007 ]
8
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009import concurrent.futures
10import functools
11import inspect
12import linecache
Victor Stinner7ef60cd2014-02-19 23:15:02 +010013import sys
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014import traceback
15import weakref
16
Victor Stinnerf951d282014-06-29 00:46:45 +020017from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import events
19from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020020from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021
Victor Stinnera02f81f2014-06-24 22:37:53 +020022_PY34 = (sys.version_info >= (3, 4))
Victor Stinner8d3e02e2014-06-18 01:14:59 +020023
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070024
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025class Task(futures.Future):
26 """A coroutine wrapped in a Future."""
27
28 # An important invariant maintained while a Task not done:
29 #
30 # - Either _fut_waiter is None, and _step() is scheduled;
31 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
32 #
33 # The only transition from the latter to the former is through
34 # _wakeup(). When _fut_waiter is not None, one of its callbacks
35 # must be _wakeup().
36
37 # Weak set containing all tasks alive.
38 _all_tasks = weakref.WeakSet()
39
Guido van Rossum1a605ed2013-12-06 12:57:40 -080040 # Dictionary containing tasks that are currently active in
41 # all running event loops. {EventLoop: Task}
42 _current_tasks = {}
43
44 @classmethod
45 def current_task(cls, loop=None):
46 """Return the currently running task in an event loop or None.
47
48 By default the current task for the current event loop is returned.
49
50 None is returned when called not in the context of a Task.
51 """
52 if loop is None:
53 loop = events.get_event_loop()
54 return cls._current_tasks.get(loop)
55
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070056 @classmethod
57 def all_tasks(cls, loop=None):
58 """Return a set of all tasks for an event loop.
59
60 By default all tasks for the current event loop are returned.
61 """
62 if loop is None:
63 loop = events.get_event_loop()
64 return {t for t in cls._all_tasks if t._loop is loop}
65
66 def __init__(self, coro, *, loop=None):
Victor Stinnerf951d282014-06-29 00:46:45 +020067 assert coroutines.iscoroutine(coro), repr(coro) # Not a coroutine function!
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070068 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +020069 if self._source_traceback:
70 del self._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070071 self._coro = iter(coro) # Use the iterator just in case.
72 self._fut_waiter = None
73 self._must_cancel = False
74 self._loop.call_soon(self._step)
75 self.__class__._all_tasks.add(self)
Victor Stinner98b63912014-06-30 14:51:04 +020076 # If False, don't log a message if the task is destroyed whereas its
77 # status is still pending
78 self._log_destroy_pending = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070079
Victor Stinnera02f81f2014-06-24 22:37:53 +020080 # On Python 3.3 or older, objects with a destructor part of a reference
81 # cycle are never destroyed. It's not more the case on Python 3.4 thanks to
82 # the PEP 442.
83 if _PY34:
84 def __del__(self):
Victor Stinner98b63912014-06-30 14:51:04 +020085 if self._state == futures._PENDING and self._log_destroy_pending:
Victor Stinner80f53aa2014-06-27 13:52:20 +020086 context = {
Victor Stinnera02f81f2014-06-24 22:37:53 +020087 'task': self,
88 'message': 'Task was destroyed but it is pending!',
Victor Stinner80f53aa2014-06-27 13:52:20 +020089 }
90 if self._source_traceback:
91 context['source_traceback'] = self._source_traceback
92 self._loop.call_exception_handler(context)
Victor Stinnera02f81f2014-06-24 22:37:53 +020093 futures.Future.__del__(self)
94
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070095 def __repr__(self):
Victor Stinner975735f2014-06-25 21:41:58 +020096 info = []
97 if self._must_cancel:
98 info.append('cancelling')
99 else:
100 info.append(self._state.lower())
101
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200102 coro = coroutines._format_coroutine(self._coro)
103 info.append('coro=<%s>' % coro)
104
105 if self._source_traceback:
106 frame = self._source_traceback[-1]
107 info.append('created at %s:%s' % (frame[0], frame[1]))
Victor Stinner975735f2014-06-25 21:41:58 +0200108
109 if self._state == futures._FINISHED:
110 info.append(self._format_result())
111
112 if self._callbacks:
113 info.append(self._format_callbacks())
114
Victor Stinner2dba23a2014-07-03 00:59:00 +0200115 if self._fut_waiter is not None:
116 info.append('wait_for=%r' % self._fut_waiter)
117
Victor Stinner975735f2014-06-25 21:41:58 +0200118 return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700119
120 def get_stack(self, *, limit=None):
121 """Return the list of stack frames for this task's coroutine.
122
123 If the coroutine is active, this returns the stack where it is
124 suspended. If the coroutine has completed successfully or was
125 cancelled, this returns an empty list. If the coroutine was
126 terminated by an exception, this returns the list of traceback
127 frames.
128
129 The frames are always ordered from oldest to newest.
130
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500131 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700132 return; by default all available frames are returned. Its
133 meaning differs depending on whether a stack or a traceback is
134 returned: the newest frames of a stack are returned, but the
135 oldest frames of a traceback are returned. (This matches the
136 behavior of the traceback module.)
137
138 For reasons beyond our control, only one stack frame is
139 returned for a suspended coroutine.
140 """
141 frames = []
142 f = self._coro.gi_frame
143 if f is not None:
144 while f is not None:
145 if limit is not None:
146 if limit <= 0:
147 break
148 limit -= 1
149 frames.append(f)
150 f = f.f_back
151 frames.reverse()
152 elif self._exception is not None:
153 tb = self._exception.__traceback__
154 while tb is not None:
155 if limit is not None:
156 if limit <= 0:
157 break
158 limit -= 1
159 frames.append(tb.tb_frame)
160 tb = tb.tb_next
161 return frames
162
163 def print_stack(self, *, limit=None, file=None):
164 """Print the stack or traceback for this task's coroutine.
165
166 This produces output similar to that of the traceback module,
167 for the frames retrieved by get_stack(). The limit argument
168 is passed to get_stack(). The file argument is an I/O stream
169 to which the output goes; by default it goes to sys.stderr.
170 """
171 extracted_list = []
172 checked = set()
173 for f in self.get_stack(limit=limit):
174 lineno = f.f_lineno
175 co = f.f_code
176 filename = co.co_filename
177 name = co.co_name
178 if filename not in checked:
179 checked.add(filename)
180 linecache.checkcache(filename)
181 line = linecache.getline(filename, lineno, f.f_globals)
182 extracted_list.append((filename, lineno, name, line))
183 exc = self._exception
184 if not extracted_list:
185 print('No stack for %r' % self, file=file)
186 elif exc is not None:
187 print('Traceback for %r (most recent call last):' % self,
188 file=file)
189 else:
190 print('Stack for %r (most recent call last):' % self,
191 file=file)
192 traceback.print_list(extracted_list, file=file)
193 if exc is not None:
194 for line in traceback.format_exception_only(exc.__class__, exc):
195 print(line, file=file, end='')
196
197 def cancel(self):
Victor Stinner8d213572014-06-02 23:06:46 +0200198 """Request this task to cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200199
Victor Stinner8d213572014-06-02 23:06:46 +0200200 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200201 wrapped coroutine on the next cycle through the event loop.
202 The coroutine then has a chance to clean up or even deny
203 the request using try/except/finally.
204
205 Contrary to Future.cancel(), this does not guarantee that the
206 task will be cancelled: the exception might be caught and
207 acted upon, delaying cancellation of the task or preventing it
208 completely. The task may also return a value or raise a
209 different exception.
210
211 Immediately after this method is called, Task.cancelled() will
212 not return True (unless the task was already cancelled). A
213 task will be marked as cancelled when the wrapped coroutine
214 terminates with a CancelledError exception (even if cancel()
215 was not called).
216 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700217 if self.done():
218 return False
219 if self._fut_waiter is not None:
220 if self._fut_waiter.cancel():
221 # Leave self._fut_waiter; it may be a Task that
222 # catches and ignores the cancellation so we may have
223 # to cancel it again later.
224 return True
225 # It must be the case that self._step is already scheduled.
226 self._must_cancel = True
227 return True
228
229 def _step(self, value=None, exc=None):
230 assert not self.done(), \
231 '_step(): already done: {!r}, {!r}, {!r}'.format(self, value, exc)
232 if self._must_cancel:
233 if not isinstance(exc, futures.CancelledError):
234 exc = futures.CancelledError()
235 self._must_cancel = False
236 coro = self._coro
237 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800238
239 self.__class__._current_tasks[self._loop] = self
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700240 # Call either coro.throw(exc) or coro.send(value).
241 try:
242 if exc is not None:
243 result = coro.throw(exc)
244 elif value is not None:
245 result = coro.send(value)
246 else:
247 result = next(coro)
248 except StopIteration as exc:
249 self.set_result(exc.value)
250 except futures.CancelledError as exc:
251 super().cancel() # I.e., Future.cancel(self).
252 except Exception as exc:
253 self.set_exception(exc)
254 except BaseException as exc:
255 self.set_exception(exc)
256 raise
257 else:
258 if isinstance(result, futures.Future):
259 # Yielded Future must come from Future.__iter__().
260 if result._blocking:
261 result._blocking = False
262 result.add_done_callback(self._wakeup)
263 self._fut_waiter = result
264 if self._must_cancel:
265 if self._fut_waiter.cancel():
266 self._must_cancel = False
267 else:
268 self._loop.call_soon(
269 self._step, None,
270 RuntimeError(
271 'yield was used instead of yield from '
272 'in task {!r} with {!r}'.format(self, result)))
273 elif result is None:
274 # Bare yield relinquishes control for one event loop iteration.
275 self._loop.call_soon(self._step)
276 elif inspect.isgenerator(result):
277 # Yielding a generator is just wrong.
278 self._loop.call_soon(
279 self._step, None,
280 RuntimeError(
281 'yield was used instead of yield from for '
282 'generator in task {!r} with {}'.format(
283 self, result)))
284 else:
285 # Yielding something else is an error.
286 self._loop.call_soon(
287 self._step, None,
288 RuntimeError(
289 'Task got bad yield: {!r}'.format(result)))
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800290 finally:
291 self.__class__._current_tasks.pop(self._loop)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100292 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700293
294 def _wakeup(self, future):
295 try:
296 value = future.result()
297 except Exception as exc:
298 # This may also be a cancellation.
299 self._step(None, exc)
300 else:
301 self._step(value, None)
302 self = None # Needed to break cycles when an exception occurs.
303
304
305# wait() and as_completed() similar to those in PEP 3148.
306
307FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
308FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
309ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
310
311
312@coroutine
313def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
314 """Wait for the Futures and coroutines given by fs to complete.
315
Victor Stinnerdb74d982014-06-10 11:16:05 +0200316 The sequence futures must not be empty.
317
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700318 Coroutines will be wrapped in Tasks.
319
320 Returns two sets of Future: (done, pending).
321
322 Usage:
323
324 done, pending = yield from asyncio.wait(fs)
325
326 Note: This does not raise TimeoutError! Futures that aren't done
327 when the timeout occurs are returned in the second set.
328 """
Victor Stinnerf951d282014-06-29 00:46:45 +0200329 if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100330 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700331 if not fs:
332 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200333 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
334 raise ValueError('Invalid return_when value: {}'.format(return_when))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700335
336 if loop is None:
337 loop = events.get_event_loop()
338
Yury Selivanov622be342014-02-06 22:06:16 -0500339 fs = {async(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700340
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700341 return (yield from _wait(fs, timeout, return_when, loop))
342
343
344def _release_waiter(waiter, value=True, *args):
345 if not waiter.done():
346 waiter.set_result(value)
347
348
349@coroutine
350def wait_for(fut, timeout, *, loop=None):
351 """Wait for the single Future or coroutine to complete, with timeout.
352
353 Coroutine will be wrapped in Task.
354
Victor Stinner421e49b2014-01-23 17:40:59 +0100355 Returns result of the Future or coroutine. When a timeout occurs,
356 it cancels the task and raises TimeoutError. To avoid the task
357 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700358
359 Usage:
360
361 result = yield from asyncio.wait_for(fut, 10.0)
362
363 """
364 if loop is None:
365 loop = events.get_event_loop()
366
Guido van Rossum48c66c32014-01-29 14:30:38 -0800367 if timeout is None:
368 return (yield from fut)
369
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700370 waiter = futures.Future(loop=loop)
371 timeout_handle = loop.call_later(timeout, _release_waiter, waiter, False)
372 cb = functools.partial(_release_waiter, waiter, True)
373
374 fut = async(fut, loop=loop)
375 fut.add_done_callback(cb)
376
377 try:
378 if (yield from waiter):
379 return fut.result()
380 else:
381 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100382 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700383 raise futures.TimeoutError()
384 finally:
385 timeout_handle.cancel()
386
387
388@coroutine
389def _wait(fs, timeout, return_when, loop):
390 """Internal helper for wait() and _wait_for().
391
392 The fs argument must be a collection of Futures.
393 """
394 assert fs, 'Set of Futures is empty.'
395 waiter = futures.Future(loop=loop)
396 timeout_handle = None
397 if timeout is not None:
398 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
399 counter = len(fs)
400
401 def _on_completion(f):
402 nonlocal counter
403 counter -= 1
404 if (counter <= 0 or
405 return_when == FIRST_COMPLETED or
406 return_when == FIRST_EXCEPTION and (not f.cancelled() and
407 f.exception() is not None)):
408 if timeout_handle is not None:
409 timeout_handle.cancel()
410 if not waiter.done():
411 waiter.set_result(False)
412
413 for f in fs:
414 f.add_done_callback(_on_completion)
415
416 try:
417 yield from waiter
418 finally:
419 if timeout_handle is not None:
420 timeout_handle.cancel()
421
422 done, pending = set(), set()
423 for f in fs:
424 f.remove_done_callback(_on_completion)
425 if f.done():
426 done.add(f)
427 else:
428 pending.add(f)
429 return done, pending
430
431
432# This is *not* a @coroutine! It is just an iterator (yielding Futures).
433def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800434 """Return an iterator whose values are coroutines.
435
436 When waiting for the yielded coroutines you'll get the results (or
437 exceptions!) of the original Futures (or coroutines), in the order
438 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700439
440 This differs from PEP 3148; the proper way to use this is:
441
442 for f in as_completed(fs):
443 result = yield from f # The 'yield from' may raise.
444 # Use result.
445
Guido van Rossumb58f0532014-02-12 17:58:19 -0800446 If a timeout is specified, the 'yield from' will raise
447 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700448
449 Note: The futures 'f' are not necessarily members of fs.
450 """
Victor Stinnerf951d282014-06-29 00:46:45 +0200451 if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100452 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700453 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov622be342014-02-06 22:06:16 -0500454 todo = {async(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800455 from .queues import Queue # Import here to avoid circular import problem.
456 done = Queue(loop=loop)
457 timeout_handle = None
458
459 def _on_timeout():
460 for f in todo:
461 f.remove_done_callback(_on_completion)
462 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
463 todo.clear() # Can't do todo.remove(f) in the loop.
464
465 def _on_completion(f):
466 if not todo:
467 return # _on_timeout() was here first.
468 todo.remove(f)
469 done.put_nowait(f)
470 if not todo and timeout_handle is not None:
471 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700472
473 @coroutine
474 def _wait_for_one():
Guido van Rossumb58f0532014-02-12 17:58:19 -0800475 f = yield from done.get()
476 if f is None:
477 # Dummy value from _on_timeout().
478 raise futures.TimeoutError
479 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700480
Guido van Rossumb58f0532014-02-12 17:58:19 -0800481 for f in todo:
482 f.add_done_callback(_on_completion)
483 if todo and timeout is not None:
484 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700485 for _ in range(len(todo)):
486 yield _wait_for_one()
487
488
489@coroutine
490def sleep(delay, result=None, *, loop=None):
491 """Coroutine that completes after a given time (in seconds)."""
492 future = futures.Future(loop=loop)
Victor Stinnera9acbe82014-07-05 15:29:41 +0200493 h = future._loop.call_later(delay,
494 future._set_result_unless_cancelled, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700495 try:
496 return (yield from future)
497 finally:
498 h.cancel()
499
500
501def async(coro_or_future, *, loop=None):
502 """Wrap a coroutine in a future.
503
504 If the argument is a Future, it is returned directly.
505 """
506 if isinstance(coro_or_future, futures.Future):
507 if loop is not None and loop is not coro_or_future._loop:
508 raise ValueError('loop argument must agree with Future')
509 return coro_or_future
Victor Stinnerf951d282014-06-29 00:46:45 +0200510 elif coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200511 if loop is None:
512 loop = events.get_event_loop()
513 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200514 if task._source_traceback:
515 del task._source_traceback[-1]
516 return task
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700517 else:
518 raise TypeError('A Future or coroutine is required')
519
520
521class _GatheringFuture(futures.Future):
522 """Helper for gather().
523
524 This overrides cancel() to cancel all the children and act more
525 like Task.cancel(), which doesn't immediately mark itself as
526 cancelled.
527 """
528
529 def __init__(self, children, *, loop=None):
530 super().__init__(loop=loop)
531 self._children = children
532
533 def cancel(self):
534 if self.done():
535 return False
536 for child in self._children:
537 child.cancel()
538 return True
539
540
541def gather(*coros_or_futures, loop=None, return_exceptions=False):
542 """Return a future aggregating results from the given coroutines
543 or futures.
544
545 All futures must share the same event loop. If all the tasks are
546 done successfully, the returned future's result is the list of
547 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500548 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700549 exceptions in the tasks are treated the same as successful
550 results, and gathered in the result list; otherwise, the first
551 raised exception will be immediately propagated to the returned
552 future.
553
554 Cancellation: if the outer Future is cancelled, all children (that
555 have not completed yet) are also cancelled. If any child is
556 cancelled, this is treated as if it raised CancelledError --
557 the outer Future is *not* cancelled in this case. (This is to
558 prevent the cancellation of one child to cause other children to
559 be cancelled.)
560 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200561 if not coros_or_futures:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700562 outer = futures.Future(loop=loop)
563 outer.set_result([])
564 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200565
566 arg_to_fut = {}
567 for arg in set(coros_or_futures):
568 if not isinstance(arg, futures.Future):
569 fut = async(arg, loop=loop)
570 if loop is None:
571 loop = fut._loop
572 # The caller cannot control this future, the "destroy pending task"
573 # warning should not be emitted.
574 fut._log_destroy_pending = False
575 else:
576 fut = arg
577 if loop is None:
578 loop = fut._loop
579 elif fut._loop is not loop:
580 raise ValueError("futures are tied to different event loops")
581 arg_to_fut[arg] = fut
582
583 children = [arg_to_fut[arg] for arg in coros_or_futures]
584 nchildren = len(children)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700585 outer = _GatheringFuture(children, loop=loop)
586 nfinished = 0
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200587 results = [None] * nchildren
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700588
589 def _done_callback(i, fut):
590 nonlocal nfinished
591 if outer._state != futures._PENDING:
592 if fut._exception is not None:
593 # Mark exception retrieved.
594 fut.exception()
595 return
596 if fut._state == futures._CANCELLED:
597 res = futures.CancelledError()
598 if not return_exceptions:
599 outer.set_exception(res)
600 return
601 elif fut._exception is not None:
602 res = fut.exception() # Mark exception retrieved.
603 if not return_exceptions:
604 outer.set_exception(res)
605 return
606 else:
607 res = fut._result
608 results[i] = res
609 nfinished += 1
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200610 if nfinished == nchildren:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700611 outer.set_result(results)
612
613 for i, fut in enumerate(children):
614 fut.add_done_callback(functools.partial(_done_callback, i))
615 return outer
616
617
618def shield(arg, *, loop=None):
619 """Wait for a future, shielding it from cancellation.
620
621 The statement
622
623 res = yield from shield(something())
624
625 is exactly equivalent to the statement
626
627 res = yield from something()
628
629 *except* that if the coroutine containing it is cancelled, the
630 task running in something() is not cancelled. From the POV of
631 something(), the cancellation did not happen. But its caller is
632 still cancelled, so the yield-from expression still raises
633 CancelledError. Note: If something() is cancelled by other means
634 this will still cancel shield().
635
636 If you want to completely ignore cancellation (not recommended)
637 you can combine shield() with a try/except clause, as follows:
638
639 try:
640 res = yield from shield(something())
641 except CancelledError:
642 res = None
643 """
644 inner = async(arg, loop=loop)
645 if inner.done():
646 # Shortcut.
647 return inner
648 loop = inner._loop
649 outer = futures.Future(loop=loop)
650
651 def _done_callback(inner):
652 if outer.cancelled():
653 # Mark inner's result as retrieved.
654 inner.cancelled() or inner.exception()
655 return
656 if inner.cancelled():
657 outer.cancel()
658 else:
659 exc = inner.exception()
660 if exc is not None:
661 outer.set_exception(exc)
662 else:
663 outer.set_result(inner.result())
664
665 inner.add_done_callback(_done_callback)
666 return outer