blob: 5a43ef257f65f3cd2d59f7e722c5c4a80c6d50c8 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Victor Stinnerf951d282014-06-29 00:46:45 +02003__all__ = ['Task',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
5 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
Guido van Rossum841d9ee2015-10-03 08:31:42 -07006 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007 ]
8
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009import concurrent.futures
10import functools
11import inspect
Yury Selivanov59eb9a42015-05-11 14:48:38 -040012import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070013import weakref
14
Yury Selivanova0c1ba62016-10-28 12:52:37 -040015from . import base_tasks
Victor Stinner71080fc2015-07-25 02:23:21 +020016from . import compat
Victor Stinnerf951d282014-06-29 00:46:45 +020017from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import events
19from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020020from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023class Task(futures.Future):
24 """A coroutine wrapped in a Future."""
25
26 # An important invariant maintained while a Task not done:
27 #
28 # - Either _fut_waiter is None, and _step() is scheduled;
29 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
30 #
31 # The only transition from the latter to the former is through
32 # _wakeup(). When _fut_waiter is not None, one of its callbacks
33 # must be _wakeup().
34
35 # Weak set containing all tasks alive.
36 _all_tasks = weakref.WeakSet()
37
Guido van Rossum1a605ed2013-12-06 12:57:40 -080038 # Dictionary containing tasks that are currently active in
39 # all running event loops. {EventLoop: Task}
40 _current_tasks = {}
41
Victor Stinnerfe22e092014-12-04 23:00:13 +010042 # If False, don't log a message if the task is destroyed whereas its
43 # status is still pending
44 _log_destroy_pending = True
45
Guido van Rossum1a605ed2013-12-06 12:57:40 -080046 @classmethod
47 def current_task(cls, loop=None):
48 """Return the currently running task in an event loop or None.
49
50 By default the current task for the current event loop is returned.
51
52 None is returned when called not in the context of a Task.
53 """
54 if loop is None:
55 loop = events.get_event_loop()
56 return cls._current_tasks.get(loop)
57
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070058 @classmethod
59 def all_tasks(cls, loop=None):
60 """Return a set of all tasks for an event loop.
61
62 By default all tasks for the current event loop are returned.
63 """
64 if loop is None:
65 loop = events.get_event_loop()
66 return {t for t in cls._all_tasks if t._loop is loop}
67
68 def __init__(self, coro, *, loop=None):
Victor Stinner15cc6782015-01-09 00:09:10 +010069 assert coroutines.iscoroutine(coro), repr(coro)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070070 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +020071 if self._source_traceback:
72 del self._source_traceback[-1]
Yury Selivanov1ad08a52015-05-28 10:52:19 -040073 self._coro = coro
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074 self._fut_waiter = None
75 self._must_cancel = False
76 self._loop.call_soon(self._step)
77 self.__class__._all_tasks.add(self)
78
R David Murray8e069d52014-09-24 13:13:45 -040079 # On Python 3.3 or older, objects with a destructor that are part of a
80 # reference cycle are never destroyed. That's not the case any more on
81 # Python 3.4 thanks to the PEP 442.
Victor Stinner71080fc2015-07-25 02:23:21 +020082 if compat.PY34:
Victor Stinnera02f81f2014-06-24 22:37:53 +020083 def __del__(self):
Victor Stinner98b63912014-06-30 14:51:04 +020084 if self._state == futures._PENDING and self._log_destroy_pending:
Victor Stinner80f53aa2014-06-27 13:52:20 +020085 context = {
Victor Stinnera02f81f2014-06-24 22:37:53 +020086 'task': self,
87 'message': 'Task was destroyed but it is pending!',
Victor Stinner80f53aa2014-06-27 13:52:20 +020088 }
89 if self._source_traceback:
90 context['source_traceback'] = self._source_traceback
91 self._loop.call_exception_handler(context)
Victor Stinnera02f81f2014-06-24 22:37:53 +020092 futures.Future.__del__(self)
93
Victor Stinner313a9802014-07-29 12:58:23 +020094 def _repr_info(self):
Yury Selivanova0c1ba62016-10-28 12:52:37 -040095 return base_tasks._task_repr_info(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070096
97 def get_stack(self, *, limit=None):
98 """Return the list of stack frames for this task's coroutine.
99
Victor Stinnerd87de832014-12-02 17:57:04 +0100100 If the coroutine is not done, this returns the stack where it is
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700101 suspended. If the coroutine has completed successfully or was
102 cancelled, this returns an empty list. If the coroutine was
103 terminated by an exception, this returns the list of traceback
104 frames.
105
106 The frames are always ordered from oldest to newest.
107
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500108 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700109 return; by default all available frames are returned. Its
110 meaning differs depending on whether a stack or a traceback is
111 returned: the newest frames of a stack are returned, but the
112 oldest frames of a traceback are returned. (This matches the
113 behavior of the traceback module.)
114
115 For reasons beyond our control, only one stack frame is
116 returned for a suspended coroutine.
117 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400118 return base_tasks._task_get_stack(self, limit)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700119
120 def print_stack(self, *, limit=None, file=None):
121 """Print the stack or traceback for this task's coroutine.
122
123 This produces output similar to that of the traceback module,
124 for the frames retrieved by get_stack(). The limit argument
125 is passed to get_stack(). The file argument is an I/O stream
R David Murray8e069d52014-09-24 13:13:45 -0400126 to which the output is written; by default output is written
127 to sys.stderr.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700128 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400129 return base_tasks._task_print_stack(self, limit, file)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700130
131 def cancel(self):
R David Murray8e069d52014-09-24 13:13:45 -0400132 """Request that this task cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200133
Victor Stinner8d213572014-06-02 23:06:46 +0200134 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200135 wrapped coroutine on the next cycle through the event loop.
136 The coroutine then has a chance to clean up or even deny
137 the request using try/except/finally.
138
R David Murray8e069d52014-09-24 13:13:45 -0400139 Unlike Future.cancel, this does not guarantee that the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200140 task will be cancelled: the exception might be caught and
R David Murray8e069d52014-09-24 13:13:45 -0400141 acted upon, delaying cancellation of the task or preventing
142 cancellation completely. The task may also return a value or
143 raise a different exception.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200144
145 Immediately after this method is called, Task.cancelled() will
146 not return True (unless the task was already cancelled). A
147 task will be marked as cancelled when the wrapped coroutine
148 terminates with a CancelledError exception (even if cancel()
149 was not called).
150 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700151 if self.done():
152 return False
153 if self._fut_waiter is not None:
154 if self._fut_waiter.cancel():
155 # Leave self._fut_waiter; it may be a Task that
156 # catches and ignores the cancellation so we may have
157 # to cancel it again later.
158 return True
159 # It must be the case that self._step is already scheduled.
160 self._must_cancel = True
161 return True
162
Yury Selivanovd59bba82015-11-20 12:41:03 -0500163 def _step(self, exc=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700164 assert not self.done(), \
Yury Selivanovd59bba82015-11-20 12:41:03 -0500165 '_step(): already done: {!r}, {!r}'.format(self, exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700166 if self._must_cancel:
167 if not isinstance(exc, futures.CancelledError):
168 exc = futures.CancelledError()
169 self._must_cancel = False
170 coro = self._coro
171 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800172
173 self.__class__._current_tasks[self._loop] = self
Yury Selivanovd59bba82015-11-20 12:41:03 -0500174 # Call either coro.throw(exc) or coro.send(None).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700175 try:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500176 if exc is None:
177 # We use the `send` method directly, because coroutines
178 # don't have `__iter__` and `__next__` methods.
179 result = coro.send(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700180 else:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500181 result = coro.throw(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700182 except StopIteration as exc:
183 self.set_result(exc.value)
Yury Selivanov4145c832016-10-09 12:19:12 -0400184 except futures.CancelledError:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700185 super().cancel() # I.e., Future.cancel(self).
186 except Exception as exc:
187 self.set_exception(exc)
188 except BaseException as exc:
189 self.set_exception(exc)
190 raise
191 else:
Guido van Rossum1140a032016-09-09 12:54:54 -0700192 blocking = getattr(result, '_asyncio_future_blocking', None)
193 if blocking is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700194 # Yielded Future must come from Future.__iter__().
Yury Selivanov0ac3a0c2015-12-11 11:33:59 -0500195 if result._loop is not self._loop:
196 self._loop.call_soon(
197 self._step,
198 RuntimeError(
199 'Task {!r} got Future {!r} attached to a '
200 'different loop'.format(self, result)))
Guido van Rossum1140a032016-09-09 12:54:54 -0700201 elif blocking:
Yury Selivanov4145c832016-10-09 12:19:12 -0400202 if result is self:
203 self._loop.call_soon(
204 self._step,
205 RuntimeError(
206 'Task cannot await on itself: {!r}'.format(
207 self)))
208 else:
209 result._asyncio_future_blocking = False
210 result.add_done_callback(self._wakeup)
211 self._fut_waiter = result
212 if self._must_cancel:
213 if self._fut_waiter.cancel():
214 self._must_cancel = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700215 else:
216 self._loop.call_soon(
Yury Selivanovd59bba82015-11-20 12:41:03 -0500217 self._step,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700218 RuntimeError(
219 'yield was used instead of yield from '
220 'in task {!r} with {!r}'.format(self, result)))
221 elif result is None:
222 # Bare yield relinquishes control for one event loop iteration.
223 self._loop.call_soon(self._step)
224 elif inspect.isgenerator(result):
225 # Yielding a generator is just wrong.
226 self._loop.call_soon(
Yury Selivanovd59bba82015-11-20 12:41:03 -0500227 self._step,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700228 RuntimeError(
229 'yield was used instead of yield from for '
230 'generator in task {!r} with {}'.format(
231 self, result)))
232 else:
233 # Yielding something else is an error.
234 self._loop.call_soon(
Yury Selivanovd59bba82015-11-20 12:41:03 -0500235 self._step,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700236 RuntimeError(
237 'Task got bad yield: {!r}'.format(result)))
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800238 finally:
239 self.__class__._current_tasks.pop(self._loop)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100240 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700241
242 def _wakeup(self, future):
243 try:
Yury Selivanova4afc482015-11-16 15:12:10 -0500244 future.result()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700245 except Exception as exc:
246 # This may also be a cancellation.
Yury Selivanovd59bba82015-11-20 12:41:03 -0500247 self._step(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700248 else:
Yury Selivanova4afc482015-11-16 15:12:10 -0500249 # Don't pass the value of `future.result()` explicitly,
250 # as `Future.__iter__` and `Future.__await__` don't need it.
251 # If we call `_step(value, None)` instead of `_step()`,
252 # Python eval loop would use `.send(value)` method call,
253 # instead of `__next__()`, which is slower for futures
254 # that return non-generator iterators from their `__iter__`.
255 self._step()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700256 self = None # Needed to break cycles when an exception occurs.
257
258
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400259_PyTask = Task
260
261
262try:
263 import _asyncio
264except ImportError:
265 pass
266else:
267 # _CTask is needed for tests.
268 Task = _CTask = _asyncio.Task
269
270
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700271# wait() and as_completed() similar to those in PEP 3148.
272
273FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
274FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
275ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
276
277
278@coroutine
279def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
280 """Wait for the Futures and coroutines given by fs to complete.
281
Victor Stinnerdb74d982014-06-10 11:16:05 +0200282 The sequence futures must not be empty.
283
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700284 Coroutines will be wrapped in Tasks.
285
286 Returns two sets of Future: (done, pending).
287
288 Usage:
289
290 done, pending = yield from asyncio.wait(fs)
291
292 Note: This does not raise TimeoutError! Futures that aren't done
293 when the timeout occurs are returned in the second set.
294 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700295 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100296 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700297 if not fs:
298 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200299 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
300 raise ValueError('Invalid return_when value: {}'.format(return_when))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700301
302 if loop is None:
303 loop = events.get_event_loop()
304
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400305 fs = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700306
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700307 return (yield from _wait(fs, timeout, return_when, loop))
308
309
Victor Stinner59e08022014-08-28 11:19:25 +0200310def _release_waiter(waiter, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700311 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200312 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700313
314
315@coroutine
316def wait_for(fut, timeout, *, loop=None):
317 """Wait for the single Future or coroutine to complete, with timeout.
318
319 Coroutine will be wrapped in Task.
320
Victor Stinner421e49b2014-01-23 17:40:59 +0100321 Returns result of the Future or coroutine. When a timeout occurs,
322 it cancels the task and raises TimeoutError. To avoid the task
323 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700324
Victor Stinner922bc2c2015-01-15 16:29:10 +0100325 If the wait is cancelled, the task is also cancelled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700326
Victor Stinner922bc2c2015-01-15 16:29:10 +0100327 This function is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700328 """
329 if loop is None:
330 loop = events.get_event_loop()
331
Guido van Rossum48c66c32014-01-29 14:30:38 -0800332 if timeout is None:
333 return (yield from fut)
334
Yury Selivanov7661db62016-05-16 15:38:39 -0400335 waiter = loop.create_future()
Victor Stinner59e08022014-08-28 11:19:25 +0200336 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
337 cb = functools.partial(_release_waiter, waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700338
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400339 fut = ensure_future(fut, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700340 fut.add_done_callback(cb)
341
342 try:
Victor Stinner59e08022014-08-28 11:19:25 +0200343 # wait until the future completes or the timeout
Victor Stinner922bc2c2015-01-15 16:29:10 +0100344 try:
345 yield from waiter
346 except futures.CancelledError:
347 fut.remove_done_callback(cb)
348 fut.cancel()
349 raise
Victor Stinner59e08022014-08-28 11:19:25 +0200350
351 if fut.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700352 return fut.result()
353 else:
354 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100355 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700356 raise futures.TimeoutError()
357 finally:
358 timeout_handle.cancel()
359
360
361@coroutine
362def _wait(fs, timeout, return_when, loop):
Victor Stinner2ba8ece2016-04-01 21:39:09 +0200363 """Internal helper for wait() and wait_for().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700364
365 The fs argument must be a collection of Futures.
366 """
367 assert fs, 'Set of Futures is empty.'
Yury Selivanov7661db62016-05-16 15:38:39 -0400368 waiter = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700369 timeout_handle = None
370 if timeout is not None:
371 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
372 counter = len(fs)
373
374 def _on_completion(f):
375 nonlocal counter
376 counter -= 1
377 if (counter <= 0 or
378 return_when == FIRST_COMPLETED or
379 return_when == FIRST_EXCEPTION and (not f.cancelled() and
380 f.exception() is not None)):
381 if timeout_handle is not None:
382 timeout_handle.cancel()
383 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200384 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700385
386 for f in fs:
387 f.add_done_callback(_on_completion)
388
389 try:
390 yield from waiter
391 finally:
392 if timeout_handle is not None:
393 timeout_handle.cancel()
394
395 done, pending = set(), set()
396 for f in fs:
397 f.remove_done_callback(_on_completion)
398 if f.done():
399 done.add(f)
400 else:
401 pending.add(f)
402 return done, pending
403
404
405# This is *not* a @coroutine! It is just an iterator (yielding Futures).
406def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800407 """Return an iterator whose values are coroutines.
408
409 When waiting for the yielded coroutines you'll get the results (or
410 exceptions!) of the original Futures (or coroutines), in the order
411 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700412
413 This differs from PEP 3148; the proper way to use this is:
414
415 for f in as_completed(fs):
416 result = yield from f # The 'yield from' may raise.
417 # Use result.
418
Guido van Rossumb58f0532014-02-12 17:58:19 -0800419 If a timeout is specified, the 'yield from' will raise
420 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700421
422 Note: The futures 'f' are not necessarily members of fs.
423 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700424 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100425 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700426 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400427 todo = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800428 from .queues import Queue # Import here to avoid circular import problem.
429 done = Queue(loop=loop)
430 timeout_handle = None
431
432 def _on_timeout():
433 for f in todo:
434 f.remove_done_callback(_on_completion)
435 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
436 todo.clear() # Can't do todo.remove(f) in the loop.
437
438 def _on_completion(f):
439 if not todo:
440 return # _on_timeout() was here first.
441 todo.remove(f)
442 done.put_nowait(f)
443 if not todo and timeout_handle is not None:
444 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700445
446 @coroutine
447 def _wait_for_one():
Guido van Rossumb58f0532014-02-12 17:58:19 -0800448 f = yield from done.get()
449 if f is None:
450 # Dummy value from _on_timeout().
451 raise futures.TimeoutError
452 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700453
Guido van Rossumb58f0532014-02-12 17:58:19 -0800454 for f in todo:
455 f.add_done_callback(_on_completion)
456 if todo and timeout is not None:
457 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700458 for _ in range(len(todo)):
459 yield _wait_for_one()
460
461
462@coroutine
463def sleep(delay, result=None, *, loop=None):
464 """Coroutine that completes after a given time (in seconds)."""
Yury Selivanovade04122015-11-05 14:29:04 -0500465 if delay == 0:
466 yield
467 return result
468
Yury Selivanov7661db62016-05-16 15:38:39 -0400469 if loop is None:
470 loop = events.get_event_loop()
471 future = loop.create_future()
Victor Stinnera9acbe82014-07-05 15:29:41 +0200472 h = future._loop.call_later(delay,
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500473 futures._set_result_unless_cancelled,
474 future, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700475 try:
476 return (yield from future)
477 finally:
478 h.cancel()
479
480
Yury Selivanov4357cf62016-09-15 13:49:08 -0400481def async_(coro_or_future, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700482 """Wrap a coroutine in a future.
483
484 If the argument is a Future, it is returned directly.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400485
486 This function is deprecated in 3.5. Use asyncio.ensure_future() instead.
487 """
488
489 warnings.warn("asyncio.async() function is deprecated, use ensure_future()",
490 DeprecationWarning)
491
492 return ensure_future(coro_or_future, loop=loop)
493
Yury Selivanov4357cf62016-09-15 13:49:08 -0400494# Silence DeprecationWarning:
495globals()['async'] = async_
496async_.__name__ = 'async'
497del async_
498
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400499
500def ensure_future(coro_or_future, *, loop=None):
Yury Selivanov620279b2015-10-02 15:00:19 -0400501 """Wrap a coroutine or an awaitable in a future.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400502
503 If the argument is a Future, it is returned directly.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700504 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700505 if futures.isfuture(coro_or_future):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700506 if loop is not None and loop is not coro_or_future._loop:
507 raise ValueError('loop argument must agree with Future')
508 return coro_or_future
Victor Stinnerf951d282014-06-29 00:46:45 +0200509 elif coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200510 if loop is None:
511 loop = events.get_event_loop()
512 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200513 if task._source_traceback:
514 del task._source_traceback[-1]
515 return task
Yury Selivanov620279b2015-10-02 15:00:19 -0400516 elif compat.PY35 and inspect.isawaitable(coro_or_future):
517 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700518 else:
Yury Selivanov620279b2015-10-02 15:00:19 -0400519 raise TypeError('A Future, a coroutine or an awaitable is required')
520
521
522@coroutine
523def _wrap_awaitable(awaitable):
524 """Helper for asyncio.ensure_future().
525
526 Wraps awaitable (an object with __await__) into a coroutine
527 that will later be wrapped in a Task by ensure_future().
528 """
529 return (yield from awaitable.__await__())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700530
531
532class _GatheringFuture(futures.Future):
533 """Helper for gather().
534
535 This overrides cancel() to cancel all the children and act more
536 like Task.cancel(), which doesn't immediately mark itself as
537 cancelled.
538 """
539
540 def __init__(self, children, *, loop=None):
541 super().__init__(loop=loop)
542 self._children = children
543
544 def cancel(self):
545 if self.done():
546 return False
Yury Selivanov3d676152016-10-21 17:22:17 -0400547 ret = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700548 for child in self._children:
Yury Selivanov3d676152016-10-21 17:22:17 -0400549 if child.cancel():
550 ret = True
551 return ret
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700552
553
554def gather(*coros_or_futures, loop=None, return_exceptions=False):
555 """Return a future aggregating results from the given coroutines
556 or futures.
557
Guido van Rossume3c65a72016-09-30 08:17:15 -0700558 Coroutines will be wrapped in a future and scheduled in the event
559 loop. They will not necessarily be scheduled in the same order as
560 passed in.
561
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700562 All futures must share the same event loop. If all the tasks are
563 done successfully, the returned future's result is the list of
564 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500565 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700566 exceptions in the tasks are treated the same as successful
567 results, and gathered in the result list; otherwise, the first
568 raised exception will be immediately propagated to the returned
569 future.
570
571 Cancellation: if the outer Future is cancelled, all children (that
572 have not completed yet) are also cancelled. If any child is
573 cancelled, this is treated as if it raised CancelledError --
574 the outer Future is *not* cancelled in this case. (This is to
575 prevent the cancellation of one child to cause other children to
576 be cancelled.)
577 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200578 if not coros_or_futures:
Yury Selivanov7661db62016-05-16 15:38:39 -0400579 if loop is None:
580 loop = events.get_event_loop()
581 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700582 outer.set_result([])
583 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200584
585 arg_to_fut = {}
586 for arg in set(coros_or_futures):
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700587 if not futures.isfuture(arg):
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400588 fut = ensure_future(arg, loop=loop)
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200589 if loop is None:
590 loop = fut._loop
591 # The caller cannot control this future, the "destroy pending task"
592 # warning should not be emitted.
593 fut._log_destroy_pending = False
594 else:
595 fut = arg
596 if loop is None:
597 loop = fut._loop
598 elif fut._loop is not loop:
599 raise ValueError("futures are tied to different event loops")
600 arg_to_fut[arg] = fut
601
602 children = [arg_to_fut[arg] for arg in coros_or_futures]
603 nchildren = len(children)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700604 outer = _GatheringFuture(children, loop=loop)
605 nfinished = 0
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200606 results = [None] * nchildren
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700607
608 def _done_callback(i, fut):
609 nonlocal nfinished
Victor Stinner3531d902015-01-09 01:42:52 +0100610 if outer.done():
611 if not fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700612 # Mark exception retrieved.
613 fut.exception()
614 return
Victor Stinner3531d902015-01-09 01:42:52 +0100615
Victor Stinner29342622015-01-29 14:15:19 +0100616 if fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700617 res = futures.CancelledError()
618 if not return_exceptions:
619 outer.set_exception(res)
620 return
621 elif fut._exception is not None:
622 res = fut.exception() # Mark exception retrieved.
623 if not return_exceptions:
624 outer.set_exception(res)
625 return
626 else:
627 res = fut._result
628 results[i] = res
629 nfinished += 1
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200630 if nfinished == nchildren:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700631 outer.set_result(results)
632
633 for i, fut in enumerate(children):
634 fut.add_done_callback(functools.partial(_done_callback, i))
635 return outer
636
637
638def shield(arg, *, loop=None):
639 """Wait for a future, shielding it from cancellation.
640
641 The statement
642
643 res = yield from shield(something())
644
645 is exactly equivalent to the statement
646
647 res = yield from something()
648
649 *except* that if the coroutine containing it is cancelled, the
650 task running in something() is not cancelled. From the POV of
651 something(), the cancellation did not happen. But its caller is
652 still cancelled, so the yield-from expression still raises
653 CancelledError. Note: If something() is cancelled by other means
654 this will still cancel shield().
655
656 If you want to completely ignore cancellation (not recommended)
657 you can combine shield() with a try/except clause, as follows:
658
659 try:
660 res = yield from shield(something())
661 except CancelledError:
662 res = None
663 """
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400664 inner = ensure_future(arg, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700665 if inner.done():
666 # Shortcut.
667 return inner
668 loop = inner._loop
Yury Selivanov7661db62016-05-16 15:38:39 -0400669 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700670
671 def _done_callback(inner):
672 if outer.cancelled():
Victor Stinner3531d902015-01-09 01:42:52 +0100673 if not inner.cancelled():
674 # Mark inner's result as retrieved.
675 inner.exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700676 return
Victor Stinner3531d902015-01-09 01:42:52 +0100677
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700678 if inner.cancelled():
679 outer.cancel()
680 else:
681 exc = inner.exception()
682 if exc is not None:
683 outer.set_exception(exc)
684 else:
685 outer.set_result(inner.result())
686
687 inner.add_done_callback(_done_callback)
688 return outer
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700689
690
691def run_coroutine_threadsafe(coro, loop):
692 """Submit a coroutine object to a given event loop.
693
694 Return a concurrent.futures.Future to access the result.
695 """
696 if not coroutines.iscoroutine(coro):
697 raise TypeError('A coroutine object is required')
698 future = concurrent.futures.Future()
699
700 def callback():
Guido van Rossum601953b2015-10-05 16:20:00 -0700701 try:
702 futures._chain_future(ensure_future(coro, loop=loop), future)
703 except Exception as exc:
704 if future.set_running_or_notify_cancel():
705 future.set_exception(exc)
706 raise
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700707
708 loop.call_soon_threadsafe(callback)
709 return future