blob: 9aebffdaba3e29f172483a9d9f0531cda3d55f39 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Victor Stinnerf951d282014-06-29 00:46:45 +02003__all__ = ['Task',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
5 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
Guido van Rossumde3a1362013-11-29 09:29:00 -08006 'gather', 'shield',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007 ]
8
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009import concurrent.futures
10import functools
11import inspect
12import linecache
Victor Stinner7ef60cd2014-02-19 23:15:02 +010013import sys
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014import traceback
15import weakref
16
Victor Stinnerf951d282014-06-29 00:46:45 +020017from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import events
19from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020020from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021
Victor Stinnera02f81f2014-06-24 22:37:53 +020022_PY34 = (sys.version_info >= (3, 4))
Victor Stinner8d3e02e2014-06-18 01:14:59 +020023
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070024
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025class Task(futures.Future):
26 """A coroutine wrapped in a Future."""
27
28 # An important invariant maintained while a Task not done:
29 #
30 # - Either _fut_waiter is None, and _step() is scheduled;
31 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
32 #
33 # The only transition from the latter to the former is through
34 # _wakeup(). When _fut_waiter is not None, one of its callbacks
35 # must be _wakeup().
36
37 # Weak set containing all tasks alive.
38 _all_tasks = weakref.WeakSet()
39
Guido van Rossum1a605ed2013-12-06 12:57:40 -080040 # Dictionary containing tasks that are currently active in
41 # all running event loops. {EventLoop: Task}
42 _current_tasks = {}
43
Victor Stinnerfe22e092014-12-04 23:00:13 +010044 # If False, don't log a message if the task is destroyed whereas its
45 # status is still pending
46 _log_destroy_pending = True
47
Guido van Rossum1a605ed2013-12-06 12:57:40 -080048 @classmethod
49 def current_task(cls, loop=None):
50 """Return the currently running task in an event loop or None.
51
52 By default the current task for the current event loop is returned.
53
54 None is returned when called not in the context of a Task.
55 """
56 if loop is None:
57 loop = events.get_event_loop()
58 return cls._current_tasks.get(loop)
59
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070060 @classmethod
61 def all_tasks(cls, loop=None):
62 """Return a set of all tasks for an event loop.
63
64 By default all tasks for the current event loop are returned.
65 """
66 if loop is None:
67 loop = events.get_event_loop()
68 return {t for t in cls._all_tasks if t._loop is loop}
69
70 def __init__(self, coro, *, loop=None):
Victor Stinnerf951d282014-06-29 00:46:45 +020071 assert coroutines.iscoroutine(coro), repr(coro) # Not a coroutine function!
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070072 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +020073 if self._source_traceback:
74 del self._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070075 self._coro = iter(coro) # Use the iterator just in case.
76 self._fut_waiter = None
77 self._must_cancel = False
78 self._loop.call_soon(self._step)
79 self.__class__._all_tasks.add(self)
80
R David Murray8e069d52014-09-24 13:13:45 -040081 # On Python 3.3 or older, objects with a destructor that are part of a
82 # reference cycle are never destroyed. That's not the case any more on
83 # Python 3.4 thanks to the PEP 442.
Victor Stinnera02f81f2014-06-24 22:37:53 +020084 if _PY34:
85 def __del__(self):
Victor Stinner98b63912014-06-30 14:51:04 +020086 if self._state == futures._PENDING and self._log_destroy_pending:
Victor Stinner80f53aa2014-06-27 13:52:20 +020087 context = {
Victor Stinnera02f81f2014-06-24 22:37:53 +020088 'task': self,
89 'message': 'Task was destroyed but it is pending!',
Victor Stinner80f53aa2014-06-27 13:52:20 +020090 }
91 if self._source_traceback:
92 context['source_traceback'] = self._source_traceback
93 self._loop.call_exception_handler(context)
Victor Stinnera02f81f2014-06-24 22:37:53 +020094 futures.Future.__del__(self)
95
Victor Stinner313a9802014-07-29 12:58:23 +020096 def _repr_info(self):
97 info = super()._repr_info()
98
Victor Stinner975735f2014-06-25 21:41:58 +020099 if self._must_cancel:
Victor Stinner313a9802014-07-29 12:58:23 +0200100 # replace status
101 info[0] = 'cancelling'
Victor Stinner975735f2014-06-25 21:41:58 +0200102
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200103 coro = coroutines._format_coroutine(self._coro)
Victor Stinner313a9802014-07-29 12:58:23 +0200104 info.insert(1, 'coro=<%s>' % coro)
Victor Stinner975735f2014-06-25 21:41:58 +0200105
Victor Stinner2dba23a2014-07-03 00:59:00 +0200106 if self._fut_waiter is not None:
Victor Stinner313a9802014-07-29 12:58:23 +0200107 info.insert(2, 'wait_for=%r' % self._fut_waiter)
108 return info
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700109
110 def get_stack(self, *, limit=None):
111 """Return the list of stack frames for this task's coroutine.
112
Victor Stinnerd87de832014-12-02 17:57:04 +0100113 If the coroutine is not done, this returns the stack where it is
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700114 suspended. If the coroutine has completed successfully or was
115 cancelled, this returns an empty list. If the coroutine was
116 terminated by an exception, this returns the list of traceback
117 frames.
118
119 The frames are always ordered from oldest to newest.
120
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500121 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700122 return; by default all available frames are returned. Its
123 meaning differs depending on whether a stack or a traceback is
124 returned: the newest frames of a stack are returned, but the
125 oldest frames of a traceback are returned. (This matches the
126 behavior of the traceback module.)
127
128 For reasons beyond our control, only one stack frame is
129 returned for a suspended coroutine.
130 """
131 frames = []
132 f = self._coro.gi_frame
133 if f is not None:
134 while f is not None:
135 if limit is not None:
136 if limit <= 0:
137 break
138 limit -= 1
139 frames.append(f)
140 f = f.f_back
141 frames.reverse()
142 elif self._exception is not None:
143 tb = self._exception.__traceback__
144 while tb is not None:
145 if limit is not None:
146 if limit <= 0:
147 break
148 limit -= 1
149 frames.append(tb.tb_frame)
150 tb = tb.tb_next
151 return frames
152
153 def print_stack(self, *, limit=None, file=None):
154 """Print the stack or traceback for this task's coroutine.
155
156 This produces output similar to that of the traceback module,
157 for the frames retrieved by get_stack(). The limit argument
158 is passed to get_stack(). The file argument is an I/O stream
R David Murray8e069d52014-09-24 13:13:45 -0400159 to which the output is written; by default output is written
160 to sys.stderr.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700161 """
162 extracted_list = []
163 checked = set()
164 for f in self.get_stack(limit=limit):
165 lineno = f.f_lineno
166 co = f.f_code
167 filename = co.co_filename
168 name = co.co_name
169 if filename not in checked:
170 checked.add(filename)
171 linecache.checkcache(filename)
172 line = linecache.getline(filename, lineno, f.f_globals)
173 extracted_list.append((filename, lineno, name, line))
174 exc = self._exception
175 if not extracted_list:
176 print('No stack for %r' % self, file=file)
177 elif exc is not None:
178 print('Traceback for %r (most recent call last):' % self,
179 file=file)
180 else:
181 print('Stack for %r (most recent call last):' % self,
182 file=file)
183 traceback.print_list(extracted_list, file=file)
184 if exc is not None:
185 for line in traceback.format_exception_only(exc.__class__, exc):
186 print(line, file=file, end='')
187
188 def cancel(self):
R David Murray8e069d52014-09-24 13:13:45 -0400189 """Request that this task cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200190
Victor Stinner8d213572014-06-02 23:06:46 +0200191 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200192 wrapped coroutine on the next cycle through the event loop.
193 The coroutine then has a chance to clean up or even deny
194 the request using try/except/finally.
195
R David Murray8e069d52014-09-24 13:13:45 -0400196 Unlike Future.cancel, this does not guarantee that the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200197 task will be cancelled: the exception might be caught and
R David Murray8e069d52014-09-24 13:13:45 -0400198 acted upon, delaying cancellation of the task or preventing
199 cancellation completely. The task may also return a value or
200 raise a different exception.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200201
202 Immediately after this method is called, Task.cancelled() will
203 not return True (unless the task was already cancelled). A
204 task will be marked as cancelled when the wrapped coroutine
205 terminates with a CancelledError exception (even if cancel()
206 was not called).
207 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700208 if self.done():
209 return False
210 if self._fut_waiter is not None:
211 if self._fut_waiter.cancel():
212 # Leave self._fut_waiter; it may be a Task that
213 # catches and ignores the cancellation so we may have
214 # to cancel it again later.
215 return True
216 # It must be the case that self._step is already scheduled.
217 self._must_cancel = True
218 return True
219
220 def _step(self, value=None, exc=None):
221 assert not self.done(), \
222 '_step(): already done: {!r}, {!r}, {!r}'.format(self, value, exc)
223 if self._must_cancel:
224 if not isinstance(exc, futures.CancelledError):
225 exc = futures.CancelledError()
226 self._must_cancel = False
227 coro = self._coro
228 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800229
230 self.__class__._current_tasks[self._loop] = self
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700231 # Call either coro.throw(exc) or coro.send(value).
232 try:
233 if exc is not None:
234 result = coro.throw(exc)
235 elif value is not None:
236 result = coro.send(value)
237 else:
238 result = next(coro)
239 except StopIteration as exc:
240 self.set_result(exc.value)
241 except futures.CancelledError as exc:
242 super().cancel() # I.e., Future.cancel(self).
243 except Exception as exc:
244 self.set_exception(exc)
245 except BaseException as exc:
246 self.set_exception(exc)
247 raise
248 else:
249 if isinstance(result, futures.Future):
250 # Yielded Future must come from Future.__iter__().
251 if result._blocking:
252 result._blocking = False
253 result.add_done_callback(self._wakeup)
254 self._fut_waiter = result
255 if self._must_cancel:
256 if self._fut_waiter.cancel():
257 self._must_cancel = False
258 else:
259 self._loop.call_soon(
260 self._step, None,
261 RuntimeError(
262 'yield was used instead of yield from '
263 'in task {!r} with {!r}'.format(self, result)))
264 elif result is None:
265 # Bare yield relinquishes control for one event loop iteration.
266 self._loop.call_soon(self._step)
267 elif inspect.isgenerator(result):
268 # Yielding a generator is just wrong.
269 self._loop.call_soon(
270 self._step, None,
271 RuntimeError(
272 'yield was used instead of yield from for '
273 'generator in task {!r} with {}'.format(
274 self, result)))
275 else:
276 # Yielding something else is an error.
277 self._loop.call_soon(
278 self._step, None,
279 RuntimeError(
280 'Task got bad yield: {!r}'.format(result)))
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800281 finally:
282 self.__class__._current_tasks.pop(self._loop)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100283 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700284
285 def _wakeup(self, future):
286 try:
287 value = future.result()
288 except Exception as exc:
289 # This may also be a cancellation.
290 self._step(None, exc)
291 else:
292 self._step(value, None)
293 self = None # Needed to break cycles when an exception occurs.
294
295
296# wait() and as_completed() similar to those in PEP 3148.
297
298FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
299FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
300ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
301
302
303@coroutine
304def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
305 """Wait for the Futures and coroutines given by fs to complete.
306
Victor Stinnerdb74d982014-06-10 11:16:05 +0200307 The sequence futures must not be empty.
308
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700309 Coroutines will be wrapped in Tasks.
310
311 Returns two sets of Future: (done, pending).
312
313 Usage:
314
315 done, pending = yield from asyncio.wait(fs)
316
317 Note: This does not raise TimeoutError! Futures that aren't done
318 when the timeout occurs are returned in the second set.
319 """
Victor Stinnerf951d282014-06-29 00:46:45 +0200320 if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100321 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700322 if not fs:
323 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200324 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
325 raise ValueError('Invalid return_when value: {}'.format(return_when))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700326
327 if loop is None:
328 loop = events.get_event_loop()
329
Yury Selivanov622be342014-02-06 22:06:16 -0500330 fs = {async(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700331
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700332 return (yield from _wait(fs, timeout, return_when, loop))
333
334
Victor Stinner59e08022014-08-28 11:19:25 +0200335def _release_waiter(waiter, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700336 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200337 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700338
339
340@coroutine
341def wait_for(fut, timeout, *, loop=None):
342 """Wait for the single Future or coroutine to complete, with timeout.
343
344 Coroutine will be wrapped in Task.
345
Victor Stinner421e49b2014-01-23 17:40:59 +0100346 Returns result of the Future or coroutine. When a timeout occurs,
347 it cancels the task and raises TimeoutError. To avoid the task
348 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700349
350 Usage:
351
352 result = yield from asyncio.wait_for(fut, 10.0)
353
354 """
355 if loop is None:
356 loop = events.get_event_loop()
357
Guido van Rossum48c66c32014-01-29 14:30:38 -0800358 if timeout is None:
359 return (yield from fut)
360
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700361 waiter = futures.Future(loop=loop)
Victor Stinner59e08022014-08-28 11:19:25 +0200362 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
363 cb = functools.partial(_release_waiter, waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700364
365 fut = async(fut, loop=loop)
366 fut.add_done_callback(cb)
367
368 try:
Victor Stinner59e08022014-08-28 11:19:25 +0200369 # wait until the future completes or the timeout
370 yield from waiter
371
372 if fut.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700373 return fut.result()
374 else:
375 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100376 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700377 raise futures.TimeoutError()
378 finally:
379 timeout_handle.cancel()
380
381
382@coroutine
383def _wait(fs, timeout, return_when, loop):
384 """Internal helper for wait() and _wait_for().
385
386 The fs argument must be a collection of Futures.
387 """
388 assert fs, 'Set of Futures is empty.'
389 waiter = futures.Future(loop=loop)
390 timeout_handle = None
391 if timeout is not None:
392 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
393 counter = len(fs)
394
395 def _on_completion(f):
396 nonlocal counter
397 counter -= 1
398 if (counter <= 0 or
399 return_when == FIRST_COMPLETED or
400 return_when == FIRST_EXCEPTION and (not f.cancelled() and
401 f.exception() is not None)):
402 if timeout_handle is not None:
403 timeout_handle.cancel()
404 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200405 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700406
407 for f in fs:
408 f.add_done_callback(_on_completion)
409
410 try:
411 yield from waiter
412 finally:
413 if timeout_handle is not None:
414 timeout_handle.cancel()
415
416 done, pending = set(), set()
417 for f in fs:
418 f.remove_done_callback(_on_completion)
419 if f.done():
420 done.add(f)
421 else:
422 pending.add(f)
423 return done, pending
424
425
426# This is *not* a @coroutine! It is just an iterator (yielding Futures).
427def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800428 """Return an iterator whose values are coroutines.
429
430 When waiting for the yielded coroutines you'll get the results (or
431 exceptions!) of the original Futures (or coroutines), in the order
432 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700433
434 This differs from PEP 3148; the proper way to use this is:
435
436 for f in as_completed(fs):
437 result = yield from f # The 'yield from' may raise.
438 # Use result.
439
Guido van Rossumb58f0532014-02-12 17:58:19 -0800440 If a timeout is specified, the 'yield from' will raise
441 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700442
443 Note: The futures 'f' are not necessarily members of fs.
444 """
Victor Stinnerf951d282014-06-29 00:46:45 +0200445 if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100446 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700447 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov622be342014-02-06 22:06:16 -0500448 todo = {async(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800449 from .queues import Queue # Import here to avoid circular import problem.
450 done = Queue(loop=loop)
451 timeout_handle = None
452
453 def _on_timeout():
454 for f in todo:
455 f.remove_done_callback(_on_completion)
456 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
457 todo.clear() # Can't do todo.remove(f) in the loop.
458
459 def _on_completion(f):
460 if not todo:
461 return # _on_timeout() was here first.
462 todo.remove(f)
463 done.put_nowait(f)
464 if not todo and timeout_handle is not None:
465 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700466
467 @coroutine
468 def _wait_for_one():
Guido van Rossumb58f0532014-02-12 17:58:19 -0800469 f = yield from done.get()
470 if f is None:
471 # Dummy value from _on_timeout().
472 raise futures.TimeoutError
473 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700474
Guido van Rossumb58f0532014-02-12 17:58:19 -0800475 for f in todo:
476 f.add_done_callback(_on_completion)
477 if todo and timeout is not None:
478 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700479 for _ in range(len(todo)):
480 yield _wait_for_one()
481
482
483@coroutine
484def sleep(delay, result=None, *, loop=None):
485 """Coroutine that completes after a given time (in seconds)."""
486 future = futures.Future(loop=loop)
Victor Stinnera9acbe82014-07-05 15:29:41 +0200487 h = future._loop.call_later(delay,
488 future._set_result_unless_cancelled, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700489 try:
490 return (yield from future)
491 finally:
492 h.cancel()
493
494
495def async(coro_or_future, *, loop=None):
496 """Wrap a coroutine in a future.
497
498 If the argument is a Future, it is returned directly.
499 """
500 if isinstance(coro_or_future, futures.Future):
501 if loop is not None and loop is not coro_or_future._loop:
502 raise ValueError('loop argument must agree with Future')
503 return coro_or_future
Victor Stinnerf951d282014-06-29 00:46:45 +0200504 elif coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200505 if loop is None:
506 loop = events.get_event_loop()
507 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200508 if task._source_traceback:
509 del task._source_traceback[-1]
510 return task
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700511 else:
512 raise TypeError('A Future or coroutine is required')
513
514
515class _GatheringFuture(futures.Future):
516 """Helper for gather().
517
518 This overrides cancel() to cancel all the children and act more
519 like Task.cancel(), which doesn't immediately mark itself as
520 cancelled.
521 """
522
523 def __init__(self, children, *, loop=None):
524 super().__init__(loop=loop)
525 self._children = children
526
527 def cancel(self):
528 if self.done():
529 return False
530 for child in self._children:
531 child.cancel()
532 return True
533
534
535def gather(*coros_or_futures, loop=None, return_exceptions=False):
536 """Return a future aggregating results from the given coroutines
537 or futures.
538
539 All futures must share the same event loop. If all the tasks are
540 done successfully, the returned future's result is the list of
541 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500542 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700543 exceptions in the tasks are treated the same as successful
544 results, and gathered in the result list; otherwise, the first
545 raised exception will be immediately propagated to the returned
546 future.
547
548 Cancellation: if the outer Future is cancelled, all children (that
549 have not completed yet) are also cancelled. If any child is
550 cancelled, this is treated as if it raised CancelledError --
551 the outer Future is *not* cancelled in this case. (This is to
552 prevent the cancellation of one child to cause other children to
553 be cancelled.)
554 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200555 if not coros_or_futures:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700556 outer = futures.Future(loop=loop)
557 outer.set_result([])
558 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200559
560 arg_to_fut = {}
561 for arg in set(coros_or_futures):
562 if not isinstance(arg, futures.Future):
563 fut = async(arg, loop=loop)
564 if loop is None:
565 loop = fut._loop
566 # The caller cannot control this future, the "destroy pending task"
567 # warning should not be emitted.
568 fut._log_destroy_pending = False
569 else:
570 fut = arg
571 if loop is None:
572 loop = fut._loop
573 elif fut._loop is not loop:
574 raise ValueError("futures are tied to different event loops")
575 arg_to_fut[arg] = fut
576
577 children = [arg_to_fut[arg] for arg in coros_or_futures]
578 nchildren = len(children)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700579 outer = _GatheringFuture(children, loop=loop)
580 nfinished = 0
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200581 results = [None] * nchildren
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700582
583 def _done_callback(i, fut):
584 nonlocal nfinished
585 if outer._state != futures._PENDING:
586 if fut._exception is not None:
587 # Mark exception retrieved.
588 fut.exception()
589 return
590 if fut._state == futures._CANCELLED:
591 res = futures.CancelledError()
592 if not return_exceptions:
593 outer.set_exception(res)
594 return
595 elif fut._exception is not None:
596 res = fut.exception() # Mark exception retrieved.
597 if not return_exceptions:
598 outer.set_exception(res)
599 return
600 else:
601 res = fut._result
602 results[i] = res
603 nfinished += 1
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200604 if nfinished == nchildren:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700605 outer.set_result(results)
606
607 for i, fut in enumerate(children):
608 fut.add_done_callback(functools.partial(_done_callback, i))
609 return outer
610
611
612def shield(arg, *, loop=None):
613 """Wait for a future, shielding it from cancellation.
614
615 The statement
616
617 res = yield from shield(something())
618
619 is exactly equivalent to the statement
620
621 res = yield from something()
622
623 *except* that if the coroutine containing it is cancelled, the
624 task running in something() is not cancelled. From the POV of
625 something(), the cancellation did not happen. But its caller is
626 still cancelled, so the yield-from expression still raises
627 CancelledError. Note: If something() is cancelled by other means
628 this will still cancel shield().
629
630 If you want to completely ignore cancellation (not recommended)
631 you can combine shield() with a try/except clause, as follows:
632
633 try:
634 res = yield from shield(something())
635 except CancelledError:
636 res = None
637 """
638 inner = async(arg, loop=loop)
639 if inner.done():
640 # Shortcut.
641 return inner
642 loop = inner._loop
643 outer = futures.Future(loop=loop)
644
645 def _done_callback(inner):
646 if outer.cancelled():
647 # Mark inner's result as retrieved.
648 inner.cancelled() or inner.exception()
649 return
650 if inner.cancelled():
651 outer.cancel()
652 else:
653 exc = inner.exception()
654 if exc is not None:
655 outer.set_exception(exc)
656 else:
657 outer.set_result(inner.result())
658
659 inner.add_done_callback(_done_callback)
660 return outer