blob: 63cbcda32fe9d1fbb15c8f9f8d44497fe453db3b [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Victor Stinnerf951d282014-06-29 00:46:45 +02003__all__ = ['Task',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
5 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
Guido van Rossum841d9ee2015-10-03 08:31:42 -07006 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007 ]
8
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009import concurrent.futures
10import functools
11import inspect
12import linecache
13import traceback
Yury Selivanov59eb9a42015-05-11 14:48:38 -040014import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015import weakref
16
Victor Stinner71080fc2015-07-25 02:23:21 +020017from . import compat
Victor Stinnerf951d282014-06-29 00:46:45 +020018from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019from . import events
20from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020021from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070024class Task(futures.Future):
25 """A coroutine wrapped in a Future."""
26
27 # An important invariant maintained while a Task not done:
28 #
29 # - Either _fut_waiter is None, and _step() is scheduled;
30 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
31 #
32 # The only transition from the latter to the former is through
33 # _wakeup(). When _fut_waiter is not None, one of its callbacks
34 # must be _wakeup().
35
36 # Weak set containing all tasks alive.
37 _all_tasks = weakref.WeakSet()
38
Guido van Rossum1a605ed2013-12-06 12:57:40 -080039 # Dictionary containing tasks that are currently active in
40 # all running event loops. {EventLoop: Task}
41 _current_tasks = {}
42
Victor Stinnerfe22e092014-12-04 23:00:13 +010043 # If False, don't log a message if the task is destroyed whereas its
44 # status is still pending
45 _log_destroy_pending = True
46
Guido van Rossum1a605ed2013-12-06 12:57:40 -080047 @classmethod
48 def current_task(cls, loop=None):
49 """Return the currently running task in an event loop or None.
50
51 By default the current task for the current event loop is returned.
52
53 None is returned when called not in the context of a Task.
54 """
55 if loop is None:
56 loop = events.get_event_loop()
57 return cls._current_tasks.get(loop)
58
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070059 @classmethod
60 def all_tasks(cls, loop=None):
61 """Return a set of all tasks for an event loop.
62
63 By default all tasks for the current event loop are returned.
64 """
65 if loop is None:
66 loop = events.get_event_loop()
67 return {t for t in cls._all_tasks if t._loop is loop}
68
69 def __init__(self, coro, *, loop=None):
Victor Stinner15cc6782015-01-09 00:09:10 +010070 assert coroutines.iscoroutine(coro), repr(coro)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070071 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +020072 if self._source_traceback:
73 del self._source_traceback[-1]
Yury Selivanov1ad08a52015-05-28 10:52:19 -040074 self._coro = coro
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070075 self._fut_waiter = None
76 self._must_cancel = False
77 self._loop.call_soon(self._step)
78 self.__class__._all_tasks.add(self)
79
R David Murray8e069d52014-09-24 13:13:45 -040080 # On Python 3.3 or older, objects with a destructor that are part of a
81 # reference cycle are never destroyed. That's not the case any more on
82 # Python 3.4 thanks to the PEP 442.
Victor Stinner71080fc2015-07-25 02:23:21 +020083 if compat.PY34:
Victor Stinnera02f81f2014-06-24 22:37:53 +020084 def __del__(self):
Victor Stinner98b63912014-06-30 14:51:04 +020085 if self._state == futures._PENDING and self._log_destroy_pending:
Victor Stinner80f53aa2014-06-27 13:52:20 +020086 context = {
Victor Stinnera02f81f2014-06-24 22:37:53 +020087 'task': self,
88 'message': 'Task was destroyed but it is pending!',
Victor Stinner80f53aa2014-06-27 13:52:20 +020089 }
90 if self._source_traceback:
91 context['source_traceback'] = self._source_traceback
92 self._loop.call_exception_handler(context)
Victor Stinnera02f81f2014-06-24 22:37:53 +020093 futures.Future.__del__(self)
94
Victor Stinner313a9802014-07-29 12:58:23 +020095 def _repr_info(self):
96 info = super()._repr_info()
97
Victor Stinner975735f2014-06-25 21:41:58 +020098 if self._must_cancel:
Victor Stinner313a9802014-07-29 12:58:23 +020099 # replace status
100 info[0] = 'cancelling'
Victor Stinner975735f2014-06-25 21:41:58 +0200101
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200102 coro = coroutines._format_coroutine(self._coro)
Victor Stinner313a9802014-07-29 12:58:23 +0200103 info.insert(1, 'coro=<%s>' % coro)
Victor Stinner975735f2014-06-25 21:41:58 +0200104
Victor Stinner2dba23a2014-07-03 00:59:00 +0200105 if self._fut_waiter is not None:
Victor Stinner313a9802014-07-29 12:58:23 +0200106 info.insert(2, 'wait_for=%r' % self._fut_waiter)
107 return info
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700108
109 def get_stack(self, *, limit=None):
110 """Return the list of stack frames for this task's coroutine.
111
Victor Stinnerd87de832014-12-02 17:57:04 +0100112 If the coroutine is not done, this returns the stack where it is
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700113 suspended. If the coroutine has completed successfully or was
114 cancelled, this returns an empty list. If the coroutine was
115 terminated by an exception, this returns the list of traceback
116 frames.
117
118 The frames are always ordered from oldest to newest.
119
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500120 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700121 return; by default all available frames are returned. Its
122 meaning differs depending on whether a stack or a traceback is
123 returned: the newest frames of a stack are returned, but the
124 oldest frames of a traceback are returned. (This matches the
125 behavior of the traceback module.)
126
127 For reasons beyond our control, only one stack frame is
128 returned for a suspended coroutine.
129 """
130 frames = []
Yury Selivanov23398332015-08-14 15:30:59 -0400131 try:
132 # 'async def' coroutines
133 f = self._coro.cr_frame
134 except AttributeError:
135 f = self._coro.gi_frame
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700136 if f is not None:
137 while f is not None:
138 if limit is not None:
139 if limit <= 0:
140 break
141 limit -= 1
142 frames.append(f)
143 f = f.f_back
144 frames.reverse()
145 elif self._exception is not None:
146 tb = self._exception.__traceback__
147 while tb is not None:
148 if limit is not None:
149 if limit <= 0:
150 break
151 limit -= 1
152 frames.append(tb.tb_frame)
153 tb = tb.tb_next
154 return frames
155
156 def print_stack(self, *, limit=None, file=None):
157 """Print the stack or traceback for this task's coroutine.
158
159 This produces output similar to that of the traceback module,
160 for the frames retrieved by get_stack(). The limit argument
161 is passed to get_stack(). The file argument is an I/O stream
R David Murray8e069d52014-09-24 13:13:45 -0400162 to which the output is written; by default output is written
163 to sys.stderr.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700164 """
165 extracted_list = []
166 checked = set()
167 for f in self.get_stack(limit=limit):
168 lineno = f.f_lineno
169 co = f.f_code
170 filename = co.co_filename
171 name = co.co_name
172 if filename not in checked:
173 checked.add(filename)
174 linecache.checkcache(filename)
175 line = linecache.getline(filename, lineno, f.f_globals)
176 extracted_list.append((filename, lineno, name, line))
177 exc = self._exception
178 if not extracted_list:
179 print('No stack for %r' % self, file=file)
180 elif exc is not None:
181 print('Traceback for %r (most recent call last):' % self,
182 file=file)
183 else:
184 print('Stack for %r (most recent call last):' % self,
185 file=file)
186 traceback.print_list(extracted_list, file=file)
187 if exc is not None:
188 for line in traceback.format_exception_only(exc.__class__, exc):
189 print(line, file=file, end='')
190
191 def cancel(self):
R David Murray8e069d52014-09-24 13:13:45 -0400192 """Request that this task cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200193
Victor Stinner8d213572014-06-02 23:06:46 +0200194 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200195 wrapped coroutine on the next cycle through the event loop.
196 The coroutine then has a chance to clean up or even deny
197 the request using try/except/finally.
198
R David Murray8e069d52014-09-24 13:13:45 -0400199 Unlike Future.cancel, this does not guarantee that the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200200 task will be cancelled: the exception might be caught and
R David Murray8e069d52014-09-24 13:13:45 -0400201 acted upon, delaying cancellation of the task or preventing
202 cancellation completely. The task may also return a value or
203 raise a different exception.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200204
205 Immediately after this method is called, Task.cancelled() will
206 not return True (unless the task was already cancelled). A
207 task will be marked as cancelled when the wrapped coroutine
208 terminates with a CancelledError exception (even if cancel()
209 was not called).
210 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700211 if self.done():
212 return False
213 if self._fut_waiter is not None:
214 if self._fut_waiter.cancel():
215 # Leave self._fut_waiter; it may be a Task that
216 # catches and ignores the cancellation so we may have
217 # to cancel it again later.
218 return True
219 # It must be the case that self._step is already scheduled.
220 self._must_cancel = True
221 return True
222
223 def _step(self, value=None, exc=None):
224 assert not self.done(), \
225 '_step(): already done: {!r}, {!r}, {!r}'.format(self, value, exc)
226 if self._must_cancel:
227 if not isinstance(exc, futures.CancelledError):
228 exc = futures.CancelledError()
229 self._must_cancel = False
230 coro = self._coro
231 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800232
233 self.__class__._current_tasks[self._loop] = self
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700234 # Call either coro.throw(exc) or coro.send(value).
235 try:
236 if exc is not None:
237 result = coro.throw(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700238 else:
Yury Selivanov1ad08a52015-05-28 10:52:19 -0400239 result = coro.send(value)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700240 except StopIteration as exc:
241 self.set_result(exc.value)
242 except futures.CancelledError as exc:
243 super().cancel() # I.e., Future.cancel(self).
244 except Exception as exc:
245 self.set_exception(exc)
246 except BaseException as exc:
247 self.set_exception(exc)
248 raise
249 else:
250 if isinstance(result, futures.Future):
251 # Yielded Future must come from Future.__iter__().
252 if result._blocking:
253 result._blocking = False
254 result.add_done_callback(self._wakeup)
255 self._fut_waiter = result
Yury Selivanov4c0a09a2015-08-02 16:49:31 -0400256 if self._must_cancel:
257 if self._fut_waiter.cancel():
258 self._must_cancel = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700259 else:
260 self._loop.call_soon(
261 self._step, None,
262 RuntimeError(
263 'yield was used instead of yield from '
264 'in task {!r} with {!r}'.format(self, result)))
265 elif result is None:
266 # Bare yield relinquishes control for one event loop iteration.
267 self._loop.call_soon(self._step)
268 elif inspect.isgenerator(result):
269 # Yielding a generator is just wrong.
270 self._loop.call_soon(
271 self._step, None,
272 RuntimeError(
273 'yield was used instead of yield from for '
274 'generator in task {!r} with {}'.format(
275 self, result)))
276 else:
277 # Yielding something else is an error.
278 self._loop.call_soon(
279 self._step, None,
280 RuntimeError(
281 'Task got bad yield: {!r}'.format(result)))
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800282 finally:
283 self.__class__._current_tasks.pop(self._loop)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100284 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700285
286 def _wakeup(self, future):
287 try:
Yury Selivanova4afc482015-11-16 15:12:10 -0500288 future.result()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700289 except Exception as exc:
290 # This may also be a cancellation.
291 self._step(None, exc)
292 else:
Yury Selivanova4afc482015-11-16 15:12:10 -0500293 # Don't pass the value of `future.result()` explicitly,
294 # as `Future.__iter__` and `Future.__await__` don't need it.
295 # If we call `_step(value, None)` instead of `_step()`,
296 # Python eval loop would use `.send(value)` method call,
297 # instead of `__next__()`, which is slower for futures
298 # that return non-generator iterators from their `__iter__`.
299 self._step()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700300 self = None # Needed to break cycles when an exception occurs.
301
302
303# wait() and as_completed() similar to those in PEP 3148.
304
305FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
306FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
307ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
308
309
310@coroutine
311def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
312 """Wait for the Futures and coroutines given by fs to complete.
313
Victor Stinnerdb74d982014-06-10 11:16:05 +0200314 The sequence futures must not be empty.
315
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700316 Coroutines will be wrapped in Tasks.
317
318 Returns two sets of Future: (done, pending).
319
320 Usage:
321
322 done, pending = yield from asyncio.wait(fs)
323
324 Note: This does not raise TimeoutError! Futures that aren't done
325 when the timeout occurs are returned in the second set.
326 """
Victor Stinnerf951d282014-06-29 00:46:45 +0200327 if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100328 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700329 if not fs:
330 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200331 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
332 raise ValueError('Invalid return_when value: {}'.format(return_when))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700333
334 if loop is None:
335 loop = events.get_event_loop()
336
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400337 fs = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700338
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700339 return (yield from _wait(fs, timeout, return_when, loop))
340
341
Victor Stinner59e08022014-08-28 11:19:25 +0200342def _release_waiter(waiter, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700343 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200344 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700345
346
347@coroutine
348def wait_for(fut, timeout, *, loop=None):
349 """Wait for the single Future or coroutine to complete, with timeout.
350
351 Coroutine will be wrapped in Task.
352
Victor Stinner421e49b2014-01-23 17:40:59 +0100353 Returns result of the Future or coroutine. When a timeout occurs,
354 it cancels the task and raises TimeoutError. To avoid the task
355 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700356
Victor Stinner922bc2c2015-01-15 16:29:10 +0100357 If the wait is cancelled, the task is also cancelled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700358
Victor Stinner922bc2c2015-01-15 16:29:10 +0100359 This function is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700360 """
361 if loop is None:
362 loop = events.get_event_loop()
363
Guido van Rossum48c66c32014-01-29 14:30:38 -0800364 if timeout is None:
365 return (yield from fut)
366
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700367 waiter = futures.Future(loop=loop)
Victor Stinner59e08022014-08-28 11:19:25 +0200368 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
369 cb = functools.partial(_release_waiter, waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700370
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400371 fut = ensure_future(fut, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700372 fut.add_done_callback(cb)
373
374 try:
Victor Stinner59e08022014-08-28 11:19:25 +0200375 # wait until the future completes or the timeout
Victor Stinner922bc2c2015-01-15 16:29:10 +0100376 try:
377 yield from waiter
378 except futures.CancelledError:
379 fut.remove_done_callback(cb)
380 fut.cancel()
381 raise
Victor Stinner59e08022014-08-28 11:19:25 +0200382
383 if fut.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700384 return fut.result()
385 else:
386 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100387 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700388 raise futures.TimeoutError()
389 finally:
390 timeout_handle.cancel()
391
392
393@coroutine
394def _wait(fs, timeout, return_when, loop):
395 """Internal helper for wait() and _wait_for().
396
397 The fs argument must be a collection of Futures.
398 """
399 assert fs, 'Set of Futures is empty.'
400 waiter = futures.Future(loop=loop)
401 timeout_handle = None
402 if timeout is not None:
403 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
404 counter = len(fs)
405
406 def _on_completion(f):
407 nonlocal counter
408 counter -= 1
409 if (counter <= 0 or
410 return_when == FIRST_COMPLETED or
411 return_when == FIRST_EXCEPTION and (not f.cancelled() and
412 f.exception() is not None)):
413 if timeout_handle is not None:
414 timeout_handle.cancel()
415 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200416 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700417
418 for f in fs:
419 f.add_done_callback(_on_completion)
420
421 try:
422 yield from waiter
423 finally:
424 if timeout_handle is not None:
425 timeout_handle.cancel()
426
427 done, pending = set(), set()
428 for f in fs:
429 f.remove_done_callback(_on_completion)
430 if f.done():
431 done.add(f)
432 else:
433 pending.add(f)
434 return done, pending
435
436
437# This is *not* a @coroutine! It is just an iterator (yielding Futures).
438def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800439 """Return an iterator whose values are coroutines.
440
441 When waiting for the yielded coroutines you'll get the results (or
442 exceptions!) of the original Futures (or coroutines), in the order
443 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700444
445 This differs from PEP 3148; the proper way to use this is:
446
447 for f in as_completed(fs):
448 result = yield from f # The 'yield from' may raise.
449 # Use result.
450
Guido van Rossumb58f0532014-02-12 17:58:19 -0800451 If a timeout is specified, the 'yield from' will raise
452 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700453
454 Note: The futures 'f' are not necessarily members of fs.
455 """
Victor Stinnerf951d282014-06-29 00:46:45 +0200456 if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100457 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700458 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400459 todo = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800460 from .queues import Queue # Import here to avoid circular import problem.
461 done = Queue(loop=loop)
462 timeout_handle = None
463
464 def _on_timeout():
465 for f in todo:
466 f.remove_done_callback(_on_completion)
467 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
468 todo.clear() # Can't do todo.remove(f) in the loop.
469
470 def _on_completion(f):
471 if not todo:
472 return # _on_timeout() was here first.
473 todo.remove(f)
474 done.put_nowait(f)
475 if not todo and timeout_handle is not None:
476 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700477
478 @coroutine
479 def _wait_for_one():
Guido van Rossumb58f0532014-02-12 17:58:19 -0800480 f = yield from done.get()
481 if f is None:
482 # Dummy value from _on_timeout().
483 raise futures.TimeoutError
484 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700485
Guido van Rossumb58f0532014-02-12 17:58:19 -0800486 for f in todo:
487 f.add_done_callback(_on_completion)
488 if todo and timeout is not None:
489 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700490 for _ in range(len(todo)):
491 yield _wait_for_one()
492
493
494@coroutine
495def sleep(delay, result=None, *, loop=None):
496 """Coroutine that completes after a given time (in seconds)."""
Yury Selivanovade04122015-11-05 14:29:04 -0500497 if delay == 0:
498 yield
499 return result
500
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700501 future = futures.Future(loop=loop)
Victor Stinnera9acbe82014-07-05 15:29:41 +0200502 h = future._loop.call_later(delay,
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500503 futures._set_result_unless_cancelled,
504 future, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700505 try:
506 return (yield from future)
507 finally:
508 h.cancel()
509
510
511def async(coro_or_future, *, loop=None):
512 """Wrap a coroutine in a future.
513
514 If the argument is a Future, it is returned directly.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400515
516 This function is deprecated in 3.5. Use asyncio.ensure_future() instead.
517 """
518
519 warnings.warn("asyncio.async() function is deprecated, use ensure_future()",
520 DeprecationWarning)
521
522 return ensure_future(coro_or_future, loop=loop)
523
524
525def ensure_future(coro_or_future, *, loop=None):
Yury Selivanov620279b2015-10-02 15:00:19 -0400526 """Wrap a coroutine or an awaitable in a future.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400527
528 If the argument is a Future, it is returned directly.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700529 """
530 if isinstance(coro_or_future, futures.Future):
531 if loop is not None and loop is not coro_or_future._loop:
532 raise ValueError('loop argument must agree with Future')
533 return coro_or_future
Victor Stinnerf951d282014-06-29 00:46:45 +0200534 elif coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200535 if loop is None:
536 loop = events.get_event_loop()
537 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200538 if task._source_traceback:
539 del task._source_traceback[-1]
540 return task
Yury Selivanov620279b2015-10-02 15:00:19 -0400541 elif compat.PY35 and inspect.isawaitable(coro_or_future):
542 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700543 else:
Yury Selivanov620279b2015-10-02 15:00:19 -0400544 raise TypeError('A Future, a coroutine or an awaitable is required')
545
546
547@coroutine
548def _wrap_awaitable(awaitable):
549 """Helper for asyncio.ensure_future().
550
551 Wraps awaitable (an object with __await__) into a coroutine
552 that will later be wrapped in a Task by ensure_future().
553 """
554 return (yield from awaitable.__await__())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700555
556
557class _GatheringFuture(futures.Future):
558 """Helper for gather().
559
560 This overrides cancel() to cancel all the children and act more
561 like Task.cancel(), which doesn't immediately mark itself as
562 cancelled.
563 """
564
565 def __init__(self, children, *, loop=None):
566 super().__init__(loop=loop)
567 self._children = children
568
569 def cancel(self):
570 if self.done():
571 return False
572 for child in self._children:
573 child.cancel()
574 return True
575
576
577def gather(*coros_or_futures, loop=None, return_exceptions=False):
578 """Return a future aggregating results from the given coroutines
579 or futures.
580
581 All futures must share the same event loop. If all the tasks are
582 done successfully, the returned future's result is the list of
583 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500584 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700585 exceptions in the tasks are treated the same as successful
586 results, and gathered in the result list; otherwise, the first
587 raised exception will be immediately propagated to the returned
588 future.
589
590 Cancellation: if the outer Future is cancelled, all children (that
591 have not completed yet) are also cancelled. If any child is
592 cancelled, this is treated as if it raised CancelledError --
593 the outer Future is *not* cancelled in this case. (This is to
594 prevent the cancellation of one child to cause other children to
595 be cancelled.)
596 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200597 if not coros_or_futures:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700598 outer = futures.Future(loop=loop)
599 outer.set_result([])
600 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200601
602 arg_to_fut = {}
603 for arg in set(coros_or_futures):
604 if not isinstance(arg, futures.Future):
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400605 fut = ensure_future(arg, loop=loop)
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200606 if loop is None:
607 loop = fut._loop
608 # The caller cannot control this future, the "destroy pending task"
609 # warning should not be emitted.
610 fut._log_destroy_pending = False
611 else:
612 fut = arg
613 if loop is None:
614 loop = fut._loop
615 elif fut._loop is not loop:
616 raise ValueError("futures are tied to different event loops")
617 arg_to_fut[arg] = fut
618
619 children = [arg_to_fut[arg] for arg in coros_or_futures]
620 nchildren = len(children)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700621 outer = _GatheringFuture(children, loop=loop)
622 nfinished = 0
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200623 results = [None] * nchildren
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700624
625 def _done_callback(i, fut):
626 nonlocal nfinished
Victor Stinner3531d902015-01-09 01:42:52 +0100627 if outer.done():
628 if not fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700629 # Mark exception retrieved.
630 fut.exception()
631 return
Victor Stinner3531d902015-01-09 01:42:52 +0100632
Victor Stinner29342622015-01-29 14:15:19 +0100633 if fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700634 res = futures.CancelledError()
635 if not return_exceptions:
636 outer.set_exception(res)
637 return
638 elif fut._exception is not None:
639 res = fut.exception() # Mark exception retrieved.
640 if not return_exceptions:
641 outer.set_exception(res)
642 return
643 else:
644 res = fut._result
645 results[i] = res
646 nfinished += 1
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200647 if nfinished == nchildren:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700648 outer.set_result(results)
649
650 for i, fut in enumerate(children):
651 fut.add_done_callback(functools.partial(_done_callback, i))
652 return outer
653
654
655def shield(arg, *, loop=None):
656 """Wait for a future, shielding it from cancellation.
657
658 The statement
659
660 res = yield from shield(something())
661
662 is exactly equivalent to the statement
663
664 res = yield from something()
665
666 *except* that if the coroutine containing it is cancelled, the
667 task running in something() is not cancelled. From the POV of
668 something(), the cancellation did not happen. But its caller is
669 still cancelled, so the yield-from expression still raises
670 CancelledError. Note: If something() is cancelled by other means
671 this will still cancel shield().
672
673 If you want to completely ignore cancellation (not recommended)
674 you can combine shield() with a try/except clause, as follows:
675
676 try:
677 res = yield from shield(something())
678 except CancelledError:
679 res = None
680 """
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400681 inner = ensure_future(arg, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700682 if inner.done():
683 # Shortcut.
684 return inner
685 loop = inner._loop
686 outer = futures.Future(loop=loop)
687
688 def _done_callback(inner):
689 if outer.cancelled():
Victor Stinner3531d902015-01-09 01:42:52 +0100690 if not inner.cancelled():
691 # Mark inner's result as retrieved.
692 inner.exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700693 return
Victor Stinner3531d902015-01-09 01:42:52 +0100694
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700695 if inner.cancelled():
696 outer.cancel()
697 else:
698 exc = inner.exception()
699 if exc is not None:
700 outer.set_exception(exc)
701 else:
702 outer.set_result(inner.result())
703
704 inner.add_done_callback(_done_callback)
705 return outer
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700706
707
708def run_coroutine_threadsafe(coro, loop):
709 """Submit a coroutine object to a given event loop.
710
711 Return a concurrent.futures.Future to access the result.
712 """
713 if not coroutines.iscoroutine(coro):
714 raise TypeError('A coroutine object is required')
715 future = concurrent.futures.Future()
716
717 def callback():
Guido van Rossum601953b2015-10-05 16:20:00 -0700718 try:
719 futures._chain_future(ensure_future(coro, loop=loop), future)
720 except Exception as exc:
721 if future.set_running_or_notify_cancel():
722 future.set_exception(exc)
723 raise
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700724
725 loop.call_soon_threadsafe(callback)
726 return future