blob: e0738021e03c78282e2b32fa6eaef78896278b15 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Victor Stinnerf951d282014-06-29 00:46:45 +02003__all__ = ['Task',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
5 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
Guido van Rossumde3a1362013-11-29 09:29:00 -08006 'gather', 'shield',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007 ]
8
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009import concurrent.futures
10import functools
11import inspect
12import linecache
Victor Stinner7ef60cd2014-02-19 23:15:02 +010013import sys
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014import traceback
15import weakref
16
Victor Stinnerf951d282014-06-29 00:46:45 +020017from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import events
19from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020020from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021
Victor Stinnera02f81f2014-06-24 22:37:53 +020022_PY34 = (sys.version_info >= (3, 4))
Victor Stinner8d3e02e2014-06-18 01:14:59 +020023
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070024
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025class Task(futures.Future):
26 """A coroutine wrapped in a Future."""
27
28 # An important invariant maintained while a Task not done:
29 #
30 # - Either _fut_waiter is None, and _step() is scheduled;
31 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
32 #
33 # The only transition from the latter to the former is through
34 # _wakeup(). When _fut_waiter is not None, one of its callbacks
35 # must be _wakeup().
36
37 # Weak set containing all tasks alive.
38 _all_tasks = weakref.WeakSet()
39
Guido van Rossum1a605ed2013-12-06 12:57:40 -080040 # Dictionary containing tasks that are currently active in
41 # all running event loops. {EventLoop: Task}
42 _current_tasks = {}
43
44 @classmethod
45 def current_task(cls, loop=None):
46 """Return the currently running task in an event loop or None.
47
48 By default the current task for the current event loop is returned.
49
50 None is returned when called not in the context of a Task.
51 """
52 if loop is None:
53 loop = events.get_event_loop()
54 return cls._current_tasks.get(loop)
55
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070056 @classmethod
57 def all_tasks(cls, loop=None):
58 """Return a set of all tasks for an event loop.
59
60 By default all tasks for the current event loop are returned.
61 """
62 if loop is None:
63 loop = events.get_event_loop()
64 return {t for t in cls._all_tasks if t._loop is loop}
65
66 def __init__(self, coro, *, loop=None):
Victor Stinnerf951d282014-06-29 00:46:45 +020067 assert coroutines.iscoroutine(coro), repr(coro) # Not a coroutine function!
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070068 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +020069 if self._source_traceback:
70 del self._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070071 self._coro = iter(coro) # Use the iterator just in case.
72 self._fut_waiter = None
73 self._must_cancel = False
74 self._loop.call_soon(self._step)
75 self.__class__._all_tasks.add(self)
Victor Stinner98b63912014-06-30 14:51:04 +020076 # If False, don't log a message if the task is destroyed whereas its
77 # status is still pending
78 self._log_destroy_pending = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070079
R David Murray8e069d52014-09-24 13:13:45 -040080 # On Python 3.3 or older, objects with a destructor that are part of a
81 # reference cycle are never destroyed. That's not the case any more on
82 # Python 3.4 thanks to the PEP 442.
Victor Stinnera02f81f2014-06-24 22:37:53 +020083 if _PY34:
84 def __del__(self):
Victor Stinner98b63912014-06-30 14:51:04 +020085 if self._state == futures._PENDING and self._log_destroy_pending:
Victor Stinner80f53aa2014-06-27 13:52:20 +020086 context = {
Victor Stinnera02f81f2014-06-24 22:37:53 +020087 'task': self,
88 'message': 'Task was destroyed but it is pending!',
Victor Stinner80f53aa2014-06-27 13:52:20 +020089 }
90 if self._source_traceback:
91 context['source_traceback'] = self._source_traceback
92 self._loop.call_exception_handler(context)
Victor Stinnera02f81f2014-06-24 22:37:53 +020093 futures.Future.__del__(self)
94
Victor Stinner313a9802014-07-29 12:58:23 +020095 def _repr_info(self):
96 info = super()._repr_info()
97
Victor Stinner975735f2014-06-25 21:41:58 +020098 if self._must_cancel:
Victor Stinner313a9802014-07-29 12:58:23 +020099 # replace status
100 info[0] = 'cancelling'
Victor Stinner975735f2014-06-25 21:41:58 +0200101
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200102 coro = coroutines._format_coroutine(self._coro)
Victor Stinner313a9802014-07-29 12:58:23 +0200103 info.insert(1, 'coro=<%s>' % coro)
Victor Stinner975735f2014-06-25 21:41:58 +0200104
Victor Stinner2dba23a2014-07-03 00:59:00 +0200105 if self._fut_waiter is not None:
Victor Stinner313a9802014-07-29 12:58:23 +0200106 info.insert(2, 'wait_for=%r' % self._fut_waiter)
107 return info
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700108
109 def get_stack(self, *, limit=None):
110 """Return the list of stack frames for this task's coroutine.
111
112 If the coroutine is active, this returns the stack where it is
113 suspended. If the coroutine has completed successfully or was
114 cancelled, this returns an empty list. If the coroutine was
115 terminated by an exception, this returns the list of traceback
116 frames.
117
118 The frames are always ordered from oldest to newest.
119
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500120 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700121 return; by default all available frames are returned. Its
122 meaning differs depending on whether a stack or a traceback is
123 returned: the newest frames of a stack are returned, but the
124 oldest frames of a traceback are returned. (This matches the
125 behavior of the traceback module.)
126
127 For reasons beyond our control, only one stack frame is
128 returned for a suspended coroutine.
129 """
130 frames = []
131 f = self._coro.gi_frame
132 if f is not None:
133 while f is not None:
134 if limit is not None:
135 if limit <= 0:
136 break
137 limit -= 1
138 frames.append(f)
139 f = f.f_back
140 frames.reverse()
141 elif self._exception is not None:
142 tb = self._exception.__traceback__
143 while tb is not None:
144 if limit is not None:
145 if limit <= 0:
146 break
147 limit -= 1
148 frames.append(tb.tb_frame)
149 tb = tb.tb_next
150 return frames
151
152 def print_stack(self, *, limit=None, file=None):
153 """Print the stack or traceback for this task's coroutine.
154
155 This produces output similar to that of the traceback module,
156 for the frames retrieved by get_stack(). The limit argument
157 is passed to get_stack(). The file argument is an I/O stream
R David Murray8e069d52014-09-24 13:13:45 -0400158 to which the output is written; by default output is written
159 to sys.stderr.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700160 """
161 extracted_list = []
162 checked = set()
163 for f in self.get_stack(limit=limit):
164 lineno = f.f_lineno
165 co = f.f_code
166 filename = co.co_filename
167 name = co.co_name
168 if filename not in checked:
169 checked.add(filename)
170 linecache.checkcache(filename)
171 line = linecache.getline(filename, lineno, f.f_globals)
172 extracted_list.append((filename, lineno, name, line))
173 exc = self._exception
174 if not extracted_list:
175 print('No stack for %r' % self, file=file)
176 elif exc is not None:
177 print('Traceback for %r (most recent call last):' % self,
178 file=file)
179 else:
180 print('Stack for %r (most recent call last):' % self,
181 file=file)
182 traceback.print_list(extracted_list, file=file)
183 if exc is not None:
184 for line in traceback.format_exception_only(exc.__class__, exc):
185 print(line, file=file, end='')
186
187 def cancel(self):
R David Murray8e069d52014-09-24 13:13:45 -0400188 """Request that this task cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200189
Victor Stinner8d213572014-06-02 23:06:46 +0200190 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200191 wrapped coroutine on the next cycle through the event loop.
192 The coroutine then has a chance to clean up or even deny
193 the request using try/except/finally.
194
R David Murray8e069d52014-09-24 13:13:45 -0400195 Unlike Future.cancel, this does not guarantee that the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200196 task will be cancelled: the exception might be caught and
R David Murray8e069d52014-09-24 13:13:45 -0400197 acted upon, delaying cancellation of the task or preventing
198 cancellation completely. The task may also return a value or
199 raise a different exception.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200200
201 Immediately after this method is called, Task.cancelled() will
202 not return True (unless the task was already cancelled). A
203 task will be marked as cancelled when the wrapped coroutine
204 terminates with a CancelledError exception (even if cancel()
205 was not called).
206 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700207 if self.done():
208 return False
209 if self._fut_waiter is not None:
210 if self._fut_waiter.cancel():
211 # Leave self._fut_waiter; it may be a Task that
212 # catches and ignores the cancellation so we may have
213 # to cancel it again later.
214 return True
215 # It must be the case that self._step is already scheduled.
216 self._must_cancel = True
217 return True
218
219 def _step(self, value=None, exc=None):
220 assert not self.done(), \
221 '_step(): already done: {!r}, {!r}, {!r}'.format(self, value, exc)
222 if self._must_cancel:
223 if not isinstance(exc, futures.CancelledError):
224 exc = futures.CancelledError()
225 self._must_cancel = False
226 coro = self._coro
227 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800228
229 self.__class__._current_tasks[self._loop] = self
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700230 # Call either coro.throw(exc) or coro.send(value).
231 try:
232 if exc is not None:
233 result = coro.throw(exc)
234 elif value is not None:
235 result = coro.send(value)
236 else:
237 result = next(coro)
238 except StopIteration as exc:
239 self.set_result(exc.value)
240 except futures.CancelledError as exc:
241 super().cancel() # I.e., Future.cancel(self).
242 except Exception as exc:
243 self.set_exception(exc)
244 except BaseException as exc:
245 self.set_exception(exc)
246 raise
247 else:
248 if isinstance(result, futures.Future):
249 # Yielded Future must come from Future.__iter__().
250 if result._blocking:
251 result._blocking = False
252 result.add_done_callback(self._wakeup)
253 self._fut_waiter = result
254 if self._must_cancel:
255 if self._fut_waiter.cancel():
256 self._must_cancel = False
257 else:
258 self._loop.call_soon(
259 self._step, None,
260 RuntimeError(
261 'yield was used instead of yield from '
262 'in task {!r} with {!r}'.format(self, result)))
263 elif result is None:
264 # Bare yield relinquishes control for one event loop iteration.
265 self._loop.call_soon(self._step)
266 elif inspect.isgenerator(result):
267 # Yielding a generator is just wrong.
268 self._loop.call_soon(
269 self._step, None,
270 RuntimeError(
271 'yield was used instead of yield from for '
272 'generator in task {!r} with {}'.format(
273 self, result)))
274 else:
275 # Yielding something else is an error.
276 self._loop.call_soon(
277 self._step, None,
278 RuntimeError(
279 'Task got bad yield: {!r}'.format(result)))
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800280 finally:
281 self.__class__._current_tasks.pop(self._loop)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100282 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700283
284 def _wakeup(self, future):
285 try:
286 value = future.result()
287 except Exception as exc:
288 # This may also be a cancellation.
289 self._step(None, exc)
290 else:
291 self._step(value, None)
292 self = None # Needed to break cycles when an exception occurs.
293
294
295# wait() and as_completed() similar to those in PEP 3148.
296
297FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
298FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
299ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
300
301
302@coroutine
303def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
304 """Wait for the Futures and coroutines given by fs to complete.
305
Victor Stinnerdb74d982014-06-10 11:16:05 +0200306 The sequence futures must not be empty.
307
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700308 Coroutines will be wrapped in Tasks.
309
310 Returns two sets of Future: (done, pending).
311
312 Usage:
313
314 done, pending = yield from asyncio.wait(fs)
315
316 Note: This does not raise TimeoutError! Futures that aren't done
317 when the timeout occurs are returned in the second set.
318 """
Victor Stinnerf951d282014-06-29 00:46:45 +0200319 if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100320 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700321 if not fs:
322 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200323 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
324 raise ValueError('Invalid return_when value: {}'.format(return_when))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700325
326 if loop is None:
327 loop = events.get_event_loop()
328
Yury Selivanov622be342014-02-06 22:06:16 -0500329 fs = {async(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700330
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700331 return (yield from _wait(fs, timeout, return_when, loop))
332
333
Victor Stinner59e08022014-08-28 11:19:25 +0200334def _release_waiter(waiter, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700335 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200336 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700337
338
339@coroutine
340def wait_for(fut, timeout, *, loop=None):
341 """Wait for the single Future or coroutine to complete, with timeout.
342
343 Coroutine will be wrapped in Task.
344
Victor Stinner421e49b2014-01-23 17:40:59 +0100345 Returns result of the Future or coroutine. When a timeout occurs,
346 it cancels the task and raises TimeoutError. To avoid the task
347 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700348
349 Usage:
350
351 result = yield from asyncio.wait_for(fut, 10.0)
352
353 """
354 if loop is None:
355 loop = events.get_event_loop()
356
Guido van Rossum48c66c32014-01-29 14:30:38 -0800357 if timeout is None:
358 return (yield from fut)
359
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700360 waiter = futures.Future(loop=loop)
Victor Stinner59e08022014-08-28 11:19:25 +0200361 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
362 cb = functools.partial(_release_waiter, waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700363
364 fut = async(fut, loop=loop)
365 fut.add_done_callback(cb)
366
367 try:
Victor Stinner59e08022014-08-28 11:19:25 +0200368 # wait until the future completes or the timeout
369 yield from waiter
370
371 if fut.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700372 return fut.result()
373 else:
374 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100375 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700376 raise futures.TimeoutError()
377 finally:
378 timeout_handle.cancel()
379
380
381@coroutine
382def _wait(fs, timeout, return_when, loop):
383 """Internal helper for wait() and _wait_for().
384
385 The fs argument must be a collection of Futures.
386 """
387 assert fs, 'Set of Futures is empty.'
388 waiter = futures.Future(loop=loop)
389 timeout_handle = None
390 if timeout is not None:
391 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
392 counter = len(fs)
393
394 def _on_completion(f):
395 nonlocal counter
396 counter -= 1
397 if (counter <= 0 or
398 return_when == FIRST_COMPLETED or
399 return_when == FIRST_EXCEPTION and (not f.cancelled() and
400 f.exception() is not None)):
401 if timeout_handle is not None:
402 timeout_handle.cancel()
403 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200404 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700405
406 for f in fs:
407 f.add_done_callback(_on_completion)
408
409 try:
410 yield from waiter
411 finally:
412 if timeout_handle is not None:
413 timeout_handle.cancel()
414
415 done, pending = set(), set()
416 for f in fs:
417 f.remove_done_callback(_on_completion)
418 if f.done():
419 done.add(f)
420 else:
421 pending.add(f)
422 return done, pending
423
424
425# This is *not* a @coroutine! It is just an iterator (yielding Futures).
426def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800427 """Return an iterator whose values are coroutines.
428
429 When waiting for the yielded coroutines you'll get the results (or
430 exceptions!) of the original Futures (or coroutines), in the order
431 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700432
433 This differs from PEP 3148; the proper way to use this is:
434
435 for f in as_completed(fs):
436 result = yield from f # The 'yield from' may raise.
437 # Use result.
438
Guido van Rossumb58f0532014-02-12 17:58:19 -0800439 If a timeout is specified, the 'yield from' will raise
440 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700441
442 Note: The futures 'f' are not necessarily members of fs.
443 """
Victor Stinnerf951d282014-06-29 00:46:45 +0200444 if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100445 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700446 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov622be342014-02-06 22:06:16 -0500447 todo = {async(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800448 from .queues import Queue # Import here to avoid circular import problem.
449 done = Queue(loop=loop)
450 timeout_handle = None
451
452 def _on_timeout():
453 for f in todo:
454 f.remove_done_callback(_on_completion)
455 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
456 todo.clear() # Can't do todo.remove(f) in the loop.
457
458 def _on_completion(f):
459 if not todo:
460 return # _on_timeout() was here first.
461 todo.remove(f)
462 done.put_nowait(f)
463 if not todo and timeout_handle is not None:
464 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700465
466 @coroutine
467 def _wait_for_one():
Guido van Rossumb58f0532014-02-12 17:58:19 -0800468 f = yield from done.get()
469 if f is None:
470 # Dummy value from _on_timeout().
471 raise futures.TimeoutError
472 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700473
Guido van Rossumb58f0532014-02-12 17:58:19 -0800474 for f in todo:
475 f.add_done_callback(_on_completion)
476 if todo and timeout is not None:
477 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700478 for _ in range(len(todo)):
479 yield _wait_for_one()
480
481
482@coroutine
483def sleep(delay, result=None, *, loop=None):
484 """Coroutine that completes after a given time (in seconds)."""
485 future = futures.Future(loop=loop)
Victor Stinnera9acbe82014-07-05 15:29:41 +0200486 h = future._loop.call_later(delay,
487 future._set_result_unless_cancelled, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700488 try:
489 return (yield from future)
490 finally:
491 h.cancel()
492
493
494def async(coro_or_future, *, loop=None):
495 """Wrap a coroutine in a future.
496
497 If the argument is a Future, it is returned directly.
498 """
499 if isinstance(coro_or_future, futures.Future):
500 if loop is not None and loop is not coro_or_future._loop:
501 raise ValueError('loop argument must agree with Future')
502 return coro_or_future
Victor Stinnerf951d282014-06-29 00:46:45 +0200503 elif coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200504 if loop is None:
505 loop = events.get_event_loop()
506 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200507 if task._source_traceback:
508 del task._source_traceback[-1]
509 return task
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700510 else:
511 raise TypeError('A Future or coroutine is required')
512
513
514class _GatheringFuture(futures.Future):
515 """Helper for gather().
516
517 This overrides cancel() to cancel all the children and act more
518 like Task.cancel(), which doesn't immediately mark itself as
519 cancelled.
520 """
521
522 def __init__(self, children, *, loop=None):
523 super().__init__(loop=loop)
524 self._children = children
525
526 def cancel(self):
527 if self.done():
528 return False
529 for child in self._children:
530 child.cancel()
531 return True
532
533
534def gather(*coros_or_futures, loop=None, return_exceptions=False):
535 """Return a future aggregating results from the given coroutines
536 or futures.
537
538 All futures must share the same event loop. If all the tasks are
539 done successfully, the returned future's result is the list of
540 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500541 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700542 exceptions in the tasks are treated the same as successful
543 results, and gathered in the result list; otherwise, the first
544 raised exception will be immediately propagated to the returned
545 future.
546
547 Cancellation: if the outer Future is cancelled, all children (that
548 have not completed yet) are also cancelled. If any child is
549 cancelled, this is treated as if it raised CancelledError --
550 the outer Future is *not* cancelled in this case. (This is to
551 prevent the cancellation of one child to cause other children to
552 be cancelled.)
553 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200554 if not coros_or_futures:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700555 outer = futures.Future(loop=loop)
556 outer.set_result([])
557 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200558
559 arg_to_fut = {}
560 for arg in set(coros_or_futures):
561 if not isinstance(arg, futures.Future):
562 fut = async(arg, loop=loop)
563 if loop is None:
564 loop = fut._loop
565 # The caller cannot control this future, the "destroy pending task"
566 # warning should not be emitted.
567 fut._log_destroy_pending = False
568 else:
569 fut = arg
570 if loop is None:
571 loop = fut._loop
572 elif fut._loop is not loop:
573 raise ValueError("futures are tied to different event loops")
574 arg_to_fut[arg] = fut
575
576 children = [arg_to_fut[arg] for arg in coros_or_futures]
577 nchildren = len(children)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700578 outer = _GatheringFuture(children, loop=loop)
579 nfinished = 0
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200580 results = [None] * nchildren
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700581
582 def _done_callback(i, fut):
583 nonlocal nfinished
584 if outer._state != futures._PENDING:
585 if fut._exception is not None:
586 # Mark exception retrieved.
587 fut.exception()
588 return
589 if fut._state == futures._CANCELLED:
590 res = futures.CancelledError()
591 if not return_exceptions:
592 outer.set_exception(res)
593 return
594 elif fut._exception is not None:
595 res = fut.exception() # Mark exception retrieved.
596 if not return_exceptions:
597 outer.set_exception(res)
598 return
599 else:
600 res = fut._result
601 results[i] = res
602 nfinished += 1
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200603 if nfinished == nchildren:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700604 outer.set_result(results)
605
606 for i, fut in enumerate(children):
607 fut.add_done_callback(functools.partial(_done_callback, i))
608 return outer
609
610
611def shield(arg, *, loop=None):
612 """Wait for a future, shielding it from cancellation.
613
614 The statement
615
616 res = yield from shield(something())
617
618 is exactly equivalent to the statement
619
620 res = yield from something()
621
622 *except* that if the coroutine containing it is cancelled, the
623 task running in something() is not cancelled. From the POV of
624 something(), the cancellation did not happen. But its caller is
625 still cancelled, so the yield-from expression still raises
626 CancelledError. Note: If something() is cancelled by other means
627 this will still cancel shield().
628
629 If you want to completely ignore cancellation (not recommended)
630 you can combine shield() with a try/except clause, as follows:
631
632 try:
633 res = yield from shield(something())
634 except CancelledError:
635 res = None
636 """
637 inner = async(arg, loop=loop)
638 if inner.done():
639 # Shortcut.
640 return inner
641 loop = inner._loop
642 outer = futures.Future(loop=loop)
643
644 def _done_callback(inner):
645 if outer.cancelled():
646 # Mark inner's result as retrieved.
647 inner.cancelled() or inner.exception()
648 return
649 if inner.cancelled():
650 outer.cancel()
651 else:
652 exc = inner.exception()
653 if exc is not None:
654 outer.set_exception(exc)
655 else:
656 outer.set_result(inner.result())
657
658 inner.add_done_callback(_done_callback)
659 return outer