blob: 52ca33a8c3ef013ca694dba8323e80d0c72f3114 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
3__all__ = ['coroutine', 'Task',
Guido van Rossum68816ef2013-12-28 08:06:40 -10004 'iscoroutinefunction', 'iscoroutine',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
6 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
Guido van Rossumde3a1362013-11-29 09:29:00 -08007 'gather', 'shield',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008 ]
9
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070010import concurrent.futures
11import functools
12import inspect
13import linecache
Victor Stinner7ef60cd2014-02-19 23:15:02 +010014import os
15import sys
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016import traceback
17import weakref
18
19from . import events
20from . import futures
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070021from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
23# If you set _DEBUG to true, @coroutine will wrap the resulting
24# generator objects in a CoroWrapper instance (defined below). That
25# instance will log a message when the generator is never iterated
26# over, which may happen when you forget to use "yield from" with a
27# coroutine call. Note that the value of the _DEBUG flag is taken
28# when the decorator is used, so to be of any use it must be set
29# before you define your coroutines. A downside of using this feature
30# is that tracebacks show entries for the CoroWrapper.__next__ method
31# when _DEBUG is true.
Victor Stinner7ef60cd2014-02-19 23:15:02 +010032_DEBUG = (not sys.flags.ignore_environment
33 and bool(os.environ.get('PYTHONASYNCIODEBUG')))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070034
Victor Stinnera02f81f2014-06-24 22:37:53 +020035_PY34 = (sys.version_info >= (3, 4))
Victor Stinner8d3e02e2014-06-18 01:14:59 +020036_PY35 = (sys.version_info >= (3, 5))
37
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038
39class CoroWrapper:
Guido van Rossume1f55442014-01-16 11:05:23 -080040 # Wrapper for coroutine in _DEBUG mode.
41
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070042 def __init__(self, gen, func):
43 assert inspect.isgenerator(gen), gen
44 self.gen = gen
45 self.func = func
46
47 def __iter__(self):
48 return self
49
50 def __next__(self):
51 return next(self.gen)
52
Yury Selivanovf15f7482014-04-14 22:21:52 -040053 def send(self, *value):
54 # We use `*value` because of a bug in CPythons prior
55 # to 3.4.1. See issue #21209 and test_yield_from_corowrapper
56 # for details. This workaround should be removed in 3.5.0.
Yury Selivanov09cc1692014-04-15 12:01:16 -040057 if len(value) == 1:
58 value = value[0]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070059 return self.gen.send(value)
60
61 def throw(self, exc):
62 return self.gen.throw(exc)
63
64 def close(self):
65 return self.gen.close()
66
Guido van Rossum0cbc7682014-04-15 12:06:34 -070067 @property
68 def gi_frame(self):
69 return self.gen.gi_frame
70
71 @property
72 def gi_running(self):
73 return self.gen.gi_running
74
75 @property
76 def gi_code(self):
77 return self.gen.gi_code
78
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070079 def __del__(self):
Guido van Rossum83c1ddd2014-04-27 10:33:58 -070080 # Be careful accessing self.gen.frame -- self.gen might not exist.
81 gen = getattr(self, 'gen', None)
82 frame = getattr(gen, 'gi_frame', None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070083 if frame is not None and frame.f_lasti == -1:
84 func = self.func
85 code = func.__code__
86 filename = code.co_filename
87 lineno = code.co_firstlineno
Guido van Rossum2b430b82013-11-01 14:13:30 -070088 logger.error(
89 'Coroutine %r defined at %s:%s was never yielded from',
90 func.__name__, filename, lineno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070091
92
93def coroutine(func):
94 """Decorator to mark coroutines.
95
96 If the coroutine is not yielded from before it is destroyed,
97 an error message is logged.
98 """
99 if inspect.isgeneratorfunction(func):
100 coro = func
101 else:
102 @functools.wraps(func)
103 def coro(*args, **kw):
104 res = func(*args, **kw)
105 if isinstance(res, futures.Future) or inspect.isgenerator(res):
106 res = yield from res
107 return res
108
109 if not _DEBUG:
110 wrapper = coro
111 else:
112 @functools.wraps(func)
113 def wrapper(*args, **kwds):
114 w = CoroWrapper(coro(*args, **kwds), func)
Victor Stinner8d3e02e2014-06-18 01:14:59 +0200115 w.__name__ = func.__name__
116 if _PY35:
117 w.__qualname__ = func.__qualname__
118 w.__doc__ = func.__doc__
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700119 return w
120
121 wrapper._is_coroutine = True # For iscoroutinefunction().
122 return wrapper
123
124
125def iscoroutinefunction(func):
126 """Return True if func is a decorated coroutine function."""
127 return getattr(func, '_is_coroutine', False)
128
129
130def iscoroutine(obj):
131 """Return True if obj is a coroutine object."""
132 return isinstance(obj, CoroWrapper) or inspect.isgenerator(obj)
133
134
Victor Stinner975735f2014-06-25 21:41:58 +0200135def _format_coroutine(coro):
136 assert iscoroutine(coro)
137 if _PY35:
138 coro_name = coro.__qualname__
139 else:
140 coro_name = coro.__name__
141
142 filename = coro.gi_code.co_filename
143 if coro.gi_frame is not None:
144 lineno = coro.gi_frame.f_lineno
145 return '%s() at %s:%s' % (coro_name, filename, lineno)
146 else:
147 lineno = coro.gi_code.co_firstlineno
148 return '%s() done at %s:%s' % (coro_name, filename, lineno)
149
150
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700151class Task(futures.Future):
152 """A coroutine wrapped in a Future."""
153
154 # An important invariant maintained while a Task not done:
155 #
156 # - Either _fut_waiter is None, and _step() is scheduled;
157 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
158 #
159 # The only transition from the latter to the former is through
160 # _wakeup(). When _fut_waiter is not None, one of its callbacks
161 # must be _wakeup().
162
163 # Weak set containing all tasks alive.
164 _all_tasks = weakref.WeakSet()
165
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800166 # Dictionary containing tasks that are currently active in
167 # all running event loops. {EventLoop: Task}
168 _current_tasks = {}
169
170 @classmethod
171 def current_task(cls, loop=None):
172 """Return the currently running task in an event loop or None.
173
174 By default the current task for the current event loop is returned.
175
176 None is returned when called not in the context of a Task.
177 """
178 if loop is None:
179 loop = events.get_event_loop()
180 return cls._current_tasks.get(loop)
181
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700182 @classmethod
183 def all_tasks(cls, loop=None):
184 """Return a set of all tasks for an event loop.
185
186 By default all tasks for the current event loop are returned.
187 """
188 if loop is None:
189 loop = events.get_event_loop()
190 return {t for t in cls._all_tasks if t._loop is loop}
191
192 def __init__(self, coro, *, loop=None):
193 assert iscoroutine(coro), repr(coro) # Not a coroutine function!
194 super().__init__(loop=loop)
195 self._coro = iter(coro) # Use the iterator just in case.
196 self._fut_waiter = None
197 self._must_cancel = False
198 self._loop.call_soon(self._step)
199 self.__class__._all_tasks.add(self)
200
Victor Stinnera02f81f2014-06-24 22:37:53 +0200201 # On Python 3.3 or older, objects with a destructor part of a reference
202 # cycle are never destroyed. It's not more the case on Python 3.4 thanks to
203 # the PEP 442.
204 if _PY34:
205 def __del__(self):
206 if self._state == futures._PENDING:
207 self._loop.call_exception_handler({
208 'task': self,
209 'message': 'Task was destroyed but it is pending!',
210 })
211 futures.Future.__del__(self)
212
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700213 def __repr__(self):
Victor Stinner975735f2014-06-25 21:41:58 +0200214 info = []
215 if self._must_cancel:
216 info.append('cancelling')
217 else:
218 info.append(self._state.lower())
219
220 info.append(_format_coroutine(self._coro))
221
222 if self._state == futures._FINISHED:
223 info.append(self._format_result())
224
225 if self._callbacks:
226 info.append(self._format_callbacks())
227
228 return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700229
230 def get_stack(self, *, limit=None):
231 """Return the list of stack frames for this task's coroutine.
232
233 If the coroutine is active, this returns the stack where it is
234 suspended. If the coroutine has completed successfully or was
235 cancelled, this returns an empty list. If the coroutine was
236 terminated by an exception, this returns the list of traceback
237 frames.
238
239 The frames are always ordered from oldest to newest.
240
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500241 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700242 return; by default all available frames are returned. Its
243 meaning differs depending on whether a stack or a traceback is
244 returned: the newest frames of a stack are returned, but the
245 oldest frames of a traceback are returned. (This matches the
246 behavior of the traceback module.)
247
248 For reasons beyond our control, only one stack frame is
249 returned for a suspended coroutine.
250 """
251 frames = []
252 f = self._coro.gi_frame
253 if f is not None:
254 while f is not None:
255 if limit is not None:
256 if limit <= 0:
257 break
258 limit -= 1
259 frames.append(f)
260 f = f.f_back
261 frames.reverse()
262 elif self._exception is not None:
263 tb = self._exception.__traceback__
264 while tb is not None:
265 if limit is not None:
266 if limit <= 0:
267 break
268 limit -= 1
269 frames.append(tb.tb_frame)
270 tb = tb.tb_next
271 return frames
272
273 def print_stack(self, *, limit=None, file=None):
274 """Print the stack or traceback for this task's coroutine.
275
276 This produces output similar to that of the traceback module,
277 for the frames retrieved by get_stack(). The limit argument
278 is passed to get_stack(). The file argument is an I/O stream
279 to which the output goes; by default it goes to sys.stderr.
280 """
281 extracted_list = []
282 checked = set()
283 for f in self.get_stack(limit=limit):
284 lineno = f.f_lineno
285 co = f.f_code
286 filename = co.co_filename
287 name = co.co_name
288 if filename not in checked:
289 checked.add(filename)
290 linecache.checkcache(filename)
291 line = linecache.getline(filename, lineno, f.f_globals)
292 extracted_list.append((filename, lineno, name, line))
293 exc = self._exception
294 if not extracted_list:
295 print('No stack for %r' % self, file=file)
296 elif exc is not None:
297 print('Traceback for %r (most recent call last):' % self,
298 file=file)
299 else:
300 print('Stack for %r (most recent call last):' % self,
301 file=file)
302 traceback.print_list(extracted_list, file=file)
303 if exc is not None:
304 for line in traceback.format_exception_only(exc.__class__, exc):
305 print(line, file=file, end='')
306
307 def cancel(self):
Victor Stinner8d213572014-06-02 23:06:46 +0200308 """Request this task to cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200309
Victor Stinner8d213572014-06-02 23:06:46 +0200310 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200311 wrapped coroutine on the next cycle through the event loop.
312 The coroutine then has a chance to clean up or even deny
313 the request using try/except/finally.
314
315 Contrary to Future.cancel(), this does not guarantee that the
316 task will be cancelled: the exception might be caught and
317 acted upon, delaying cancellation of the task or preventing it
318 completely. The task may also return a value or raise a
319 different exception.
320
321 Immediately after this method is called, Task.cancelled() will
322 not return True (unless the task was already cancelled). A
323 task will be marked as cancelled when the wrapped coroutine
324 terminates with a CancelledError exception (even if cancel()
325 was not called).
326 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700327 if self.done():
328 return False
329 if self._fut_waiter is not None:
330 if self._fut_waiter.cancel():
331 # Leave self._fut_waiter; it may be a Task that
332 # catches and ignores the cancellation so we may have
333 # to cancel it again later.
334 return True
335 # It must be the case that self._step is already scheduled.
336 self._must_cancel = True
337 return True
338
339 def _step(self, value=None, exc=None):
340 assert not self.done(), \
341 '_step(): already done: {!r}, {!r}, {!r}'.format(self, value, exc)
342 if self._must_cancel:
343 if not isinstance(exc, futures.CancelledError):
344 exc = futures.CancelledError()
345 self._must_cancel = False
346 coro = self._coro
347 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800348
349 self.__class__._current_tasks[self._loop] = self
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700350 # Call either coro.throw(exc) or coro.send(value).
351 try:
352 if exc is not None:
353 result = coro.throw(exc)
354 elif value is not None:
355 result = coro.send(value)
356 else:
357 result = next(coro)
358 except StopIteration as exc:
359 self.set_result(exc.value)
360 except futures.CancelledError as exc:
361 super().cancel() # I.e., Future.cancel(self).
362 except Exception as exc:
363 self.set_exception(exc)
364 except BaseException as exc:
365 self.set_exception(exc)
366 raise
367 else:
368 if isinstance(result, futures.Future):
369 # Yielded Future must come from Future.__iter__().
370 if result._blocking:
371 result._blocking = False
372 result.add_done_callback(self._wakeup)
373 self._fut_waiter = result
374 if self._must_cancel:
375 if self._fut_waiter.cancel():
376 self._must_cancel = False
377 else:
378 self._loop.call_soon(
379 self._step, None,
380 RuntimeError(
381 'yield was used instead of yield from '
382 'in task {!r} with {!r}'.format(self, result)))
383 elif result is None:
384 # Bare yield relinquishes control for one event loop iteration.
385 self._loop.call_soon(self._step)
386 elif inspect.isgenerator(result):
387 # Yielding a generator is just wrong.
388 self._loop.call_soon(
389 self._step, None,
390 RuntimeError(
391 'yield was used instead of yield from for '
392 'generator in task {!r} with {}'.format(
393 self, result)))
394 else:
395 # Yielding something else is an error.
396 self._loop.call_soon(
397 self._step, None,
398 RuntimeError(
399 'Task got bad yield: {!r}'.format(result)))
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800400 finally:
401 self.__class__._current_tasks.pop(self._loop)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100402 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700403
404 def _wakeup(self, future):
405 try:
406 value = future.result()
407 except Exception as exc:
408 # This may also be a cancellation.
409 self._step(None, exc)
410 else:
411 self._step(value, None)
412 self = None # Needed to break cycles when an exception occurs.
413
414
415# wait() and as_completed() similar to those in PEP 3148.
416
417FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
418FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
419ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
420
421
422@coroutine
423def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
424 """Wait for the Futures and coroutines given by fs to complete.
425
Victor Stinnerdb74d982014-06-10 11:16:05 +0200426 The sequence futures must not be empty.
427
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700428 Coroutines will be wrapped in Tasks.
429
430 Returns two sets of Future: (done, pending).
431
432 Usage:
433
434 done, pending = yield from asyncio.wait(fs)
435
436 Note: This does not raise TimeoutError! Futures that aren't done
437 when the timeout occurs are returned in the second set.
438 """
Victor Stinnereb748762014-02-11 11:54:08 +0100439 if isinstance(fs, futures.Future) or iscoroutine(fs):
440 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700441 if not fs:
442 raise ValueError('Set of coroutines/Futures is empty.')
443
444 if loop is None:
445 loop = events.get_event_loop()
446
Yury Selivanov622be342014-02-06 22:06:16 -0500447 fs = {async(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700448
449 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
450 raise ValueError('Invalid return_when value: {}'.format(return_when))
451 return (yield from _wait(fs, timeout, return_when, loop))
452
453
454def _release_waiter(waiter, value=True, *args):
455 if not waiter.done():
456 waiter.set_result(value)
457
458
459@coroutine
460def wait_for(fut, timeout, *, loop=None):
461 """Wait for the single Future or coroutine to complete, with timeout.
462
463 Coroutine will be wrapped in Task.
464
Victor Stinner421e49b2014-01-23 17:40:59 +0100465 Returns result of the Future or coroutine. When a timeout occurs,
466 it cancels the task and raises TimeoutError. To avoid the task
467 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700468
469 Usage:
470
471 result = yield from asyncio.wait_for(fut, 10.0)
472
473 """
474 if loop is None:
475 loop = events.get_event_loop()
476
Guido van Rossum48c66c32014-01-29 14:30:38 -0800477 if timeout is None:
478 return (yield from fut)
479
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700480 waiter = futures.Future(loop=loop)
481 timeout_handle = loop.call_later(timeout, _release_waiter, waiter, False)
482 cb = functools.partial(_release_waiter, waiter, True)
483
484 fut = async(fut, loop=loop)
485 fut.add_done_callback(cb)
486
487 try:
488 if (yield from waiter):
489 return fut.result()
490 else:
491 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100492 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700493 raise futures.TimeoutError()
494 finally:
495 timeout_handle.cancel()
496
497
498@coroutine
499def _wait(fs, timeout, return_when, loop):
500 """Internal helper for wait() and _wait_for().
501
502 The fs argument must be a collection of Futures.
503 """
504 assert fs, 'Set of Futures is empty.'
505 waiter = futures.Future(loop=loop)
506 timeout_handle = None
507 if timeout is not None:
508 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
509 counter = len(fs)
510
511 def _on_completion(f):
512 nonlocal counter
513 counter -= 1
514 if (counter <= 0 or
515 return_when == FIRST_COMPLETED or
516 return_when == FIRST_EXCEPTION and (not f.cancelled() and
517 f.exception() is not None)):
518 if timeout_handle is not None:
519 timeout_handle.cancel()
520 if not waiter.done():
521 waiter.set_result(False)
522
523 for f in fs:
524 f.add_done_callback(_on_completion)
525
526 try:
527 yield from waiter
528 finally:
529 if timeout_handle is not None:
530 timeout_handle.cancel()
531
532 done, pending = set(), set()
533 for f in fs:
534 f.remove_done_callback(_on_completion)
535 if f.done():
536 done.add(f)
537 else:
538 pending.add(f)
539 return done, pending
540
541
542# This is *not* a @coroutine! It is just an iterator (yielding Futures).
543def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800544 """Return an iterator whose values are coroutines.
545
546 When waiting for the yielded coroutines you'll get the results (or
547 exceptions!) of the original Futures (or coroutines), in the order
548 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700549
550 This differs from PEP 3148; the proper way to use this is:
551
552 for f in as_completed(fs):
553 result = yield from f # The 'yield from' may raise.
554 # Use result.
555
Guido van Rossumb58f0532014-02-12 17:58:19 -0800556 If a timeout is specified, the 'yield from' will raise
557 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700558
559 Note: The futures 'f' are not necessarily members of fs.
560 """
Victor Stinnereb748762014-02-11 11:54:08 +0100561 if isinstance(fs, futures.Future) or iscoroutine(fs):
562 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700563 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov622be342014-02-06 22:06:16 -0500564 todo = {async(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800565 from .queues import Queue # Import here to avoid circular import problem.
566 done = Queue(loop=loop)
567 timeout_handle = None
568
569 def _on_timeout():
570 for f in todo:
571 f.remove_done_callback(_on_completion)
572 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
573 todo.clear() # Can't do todo.remove(f) in the loop.
574
575 def _on_completion(f):
576 if not todo:
577 return # _on_timeout() was here first.
578 todo.remove(f)
579 done.put_nowait(f)
580 if not todo and timeout_handle is not None:
581 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700582
583 @coroutine
584 def _wait_for_one():
Guido van Rossumb58f0532014-02-12 17:58:19 -0800585 f = yield from done.get()
586 if f is None:
587 # Dummy value from _on_timeout().
588 raise futures.TimeoutError
589 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700590
Guido van Rossumb58f0532014-02-12 17:58:19 -0800591 for f in todo:
592 f.add_done_callback(_on_completion)
593 if todo and timeout is not None:
594 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700595 for _ in range(len(todo)):
596 yield _wait_for_one()
597
598
599@coroutine
600def sleep(delay, result=None, *, loop=None):
601 """Coroutine that completes after a given time (in seconds)."""
602 future = futures.Future(loop=loop)
603 h = future._loop.call_later(delay, future.set_result, result)
604 try:
605 return (yield from future)
606 finally:
607 h.cancel()
608
609
610def async(coro_or_future, *, loop=None):
611 """Wrap a coroutine in a future.
612
613 If the argument is a Future, it is returned directly.
614 """
615 if isinstance(coro_or_future, futures.Future):
616 if loop is not None and loop is not coro_or_future._loop:
617 raise ValueError('loop argument must agree with Future')
618 return coro_or_future
619 elif iscoroutine(coro_or_future):
620 return Task(coro_or_future, loop=loop)
621 else:
622 raise TypeError('A Future or coroutine is required')
623
624
625class _GatheringFuture(futures.Future):
626 """Helper for gather().
627
628 This overrides cancel() to cancel all the children and act more
629 like Task.cancel(), which doesn't immediately mark itself as
630 cancelled.
631 """
632
633 def __init__(self, children, *, loop=None):
634 super().__init__(loop=loop)
635 self._children = children
636
637 def cancel(self):
638 if self.done():
639 return False
640 for child in self._children:
641 child.cancel()
642 return True
643
644
645def gather(*coros_or_futures, loop=None, return_exceptions=False):
646 """Return a future aggregating results from the given coroutines
647 or futures.
648
649 All futures must share the same event loop. If all the tasks are
650 done successfully, the returned future's result is the list of
651 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500652 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700653 exceptions in the tasks are treated the same as successful
654 results, and gathered in the result list; otherwise, the first
655 raised exception will be immediately propagated to the returned
656 future.
657
658 Cancellation: if the outer Future is cancelled, all children (that
659 have not completed yet) are also cancelled. If any child is
660 cancelled, this is treated as if it raised CancelledError --
661 the outer Future is *not* cancelled in this case. (This is to
662 prevent the cancellation of one child to cause other children to
663 be cancelled.)
664 """
Yury Selivanov622be342014-02-06 22:06:16 -0500665 arg_to_fut = {arg: async(arg, loop=loop) for arg in set(coros_or_futures)}
666 children = [arg_to_fut[arg] for arg in coros_or_futures]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700667 n = len(children)
668 if n == 0:
669 outer = futures.Future(loop=loop)
670 outer.set_result([])
671 return outer
672 if loop is None:
673 loop = children[0]._loop
674 for fut in children:
675 if fut._loop is not loop:
676 raise ValueError("futures are tied to different event loops")
677 outer = _GatheringFuture(children, loop=loop)
678 nfinished = 0
679 results = [None] * n
680
681 def _done_callback(i, fut):
682 nonlocal nfinished
683 if outer._state != futures._PENDING:
684 if fut._exception is not None:
685 # Mark exception retrieved.
686 fut.exception()
687 return
688 if fut._state == futures._CANCELLED:
689 res = futures.CancelledError()
690 if not return_exceptions:
691 outer.set_exception(res)
692 return
693 elif fut._exception is not None:
694 res = fut.exception() # Mark exception retrieved.
695 if not return_exceptions:
696 outer.set_exception(res)
697 return
698 else:
699 res = fut._result
700 results[i] = res
701 nfinished += 1
702 if nfinished == n:
703 outer.set_result(results)
704
705 for i, fut in enumerate(children):
706 fut.add_done_callback(functools.partial(_done_callback, i))
707 return outer
708
709
710def shield(arg, *, loop=None):
711 """Wait for a future, shielding it from cancellation.
712
713 The statement
714
715 res = yield from shield(something())
716
717 is exactly equivalent to the statement
718
719 res = yield from something()
720
721 *except* that if the coroutine containing it is cancelled, the
722 task running in something() is not cancelled. From the POV of
723 something(), the cancellation did not happen. But its caller is
724 still cancelled, so the yield-from expression still raises
725 CancelledError. Note: If something() is cancelled by other means
726 this will still cancel shield().
727
728 If you want to completely ignore cancellation (not recommended)
729 you can combine shield() with a try/except clause, as follows:
730
731 try:
732 res = yield from shield(something())
733 except CancelledError:
734 res = None
735 """
736 inner = async(arg, loop=loop)
737 if inner.done():
738 # Shortcut.
739 return inner
740 loop = inner._loop
741 outer = futures.Future(loop=loop)
742
743 def _done_callback(inner):
744 if outer.cancelled():
745 # Mark inner's result as retrieved.
746 inner.cancelled() or inner.exception()
747 return
748 if inner.cancelled():
749 outer.cancel()
750 else:
751 exc = inner.exception()
752 if exc is not None:
753 outer.set_exception(exc)
754 else:
755 outer.set_result(inner.result())
756
757 inner.add_done_callback(_done_callback)
758 return outer