blob: 4d79367d5cb600ea31fdfce5cbcd2a40c2e8aeeb [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Victor Stinnerf951d282014-06-29 00:46:45 +02003__all__ = ['Task',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
5 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
Guido van Rossum841d9ee2015-10-03 08:31:42 -07006 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007 ]
8
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009import concurrent.futures
10import functools
11import inspect
Yury Selivanov59eb9a42015-05-11 14:48:38 -040012import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070013import weakref
14
Yury Selivanova0c1ba62016-10-28 12:52:37 -040015from . import base_tasks
Victor Stinner71080fc2015-07-25 02:23:21 +020016from . import compat
Victor Stinnerf951d282014-06-29 00:46:45 +020017from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import events
19from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020020from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023class Task(futures.Future):
24 """A coroutine wrapped in a Future."""
25
26 # An important invariant maintained while a Task not done:
27 #
28 # - Either _fut_waiter is None, and _step() is scheduled;
29 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
30 #
31 # The only transition from the latter to the former is through
32 # _wakeup(). When _fut_waiter is not None, one of its callbacks
33 # must be _wakeup().
34
35 # Weak set containing all tasks alive.
36 _all_tasks = weakref.WeakSet()
37
Guido van Rossum1a605ed2013-12-06 12:57:40 -080038 # Dictionary containing tasks that are currently active in
39 # all running event loops. {EventLoop: Task}
40 _current_tasks = {}
41
Victor Stinnerfe22e092014-12-04 23:00:13 +010042 # If False, don't log a message if the task is destroyed whereas its
43 # status is still pending
44 _log_destroy_pending = True
45
Guido van Rossum1a605ed2013-12-06 12:57:40 -080046 @classmethod
47 def current_task(cls, loop=None):
48 """Return the currently running task in an event loop or None.
49
50 By default the current task for the current event loop is returned.
51
52 None is returned when called not in the context of a Task.
53 """
54 if loop is None:
55 loop = events.get_event_loop()
56 return cls._current_tasks.get(loop)
57
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070058 @classmethod
59 def all_tasks(cls, loop=None):
60 """Return a set of all tasks for an event loop.
61
62 By default all tasks for the current event loop are returned.
63 """
64 if loop is None:
65 loop = events.get_event_loop()
66 return {t for t in cls._all_tasks if t._loop is loop}
67
68 def __init__(self, coro, *, loop=None):
Victor Stinner15cc6782015-01-09 00:09:10 +010069 assert coroutines.iscoroutine(coro), repr(coro)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070070 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +020071 if self._source_traceback:
72 del self._source_traceback[-1]
Yury Selivanov1ad08a52015-05-28 10:52:19 -040073 self._coro = coro
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074 self._fut_waiter = None
75 self._must_cancel = False
76 self._loop.call_soon(self._step)
77 self.__class__._all_tasks.add(self)
78
R David Murray8e069d52014-09-24 13:13:45 -040079 # On Python 3.3 or older, objects with a destructor that are part of a
80 # reference cycle are never destroyed. That's not the case any more on
81 # Python 3.4 thanks to the PEP 442.
Victor Stinner71080fc2015-07-25 02:23:21 +020082 if compat.PY34:
Victor Stinnera02f81f2014-06-24 22:37:53 +020083 def __del__(self):
Victor Stinner98b63912014-06-30 14:51:04 +020084 if self._state == futures._PENDING and self._log_destroy_pending:
Victor Stinner80f53aa2014-06-27 13:52:20 +020085 context = {
Victor Stinnera02f81f2014-06-24 22:37:53 +020086 'task': self,
87 'message': 'Task was destroyed but it is pending!',
Victor Stinner80f53aa2014-06-27 13:52:20 +020088 }
89 if self._source_traceback:
90 context['source_traceback'] = self._source_traceback
91 self._loop.call_exception_handler(context)
Victor Stinnera02f81f2014-06-24 22:37:53 +020092 futures.Future.__del__(self)
93
Victor Stinner313a9802014-07-29 12:58:23 +020094 def _repr_info(self):
Yury Selivanova0c1ba62016-10-28 12:52:37 -040095 return base_tasks._task_repr_info(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070096
97 def get_stack(self, *, limit=None):
98 """Return the list of stack frames for this task's coroutine.
99
Victor Stinnerd87de832014-12-02 17:57:04 +0100100 If the coroutine is not done, this returns the stack where it is
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700101 suspended. If the coroutine has completed successfully or was
102 cancelled, this returns an empty list. If the coroutine was
103 terminated by an exception, this returns the list of traceback
104 frames.
105
106 The frames are always ordered from oldest to newest.
107
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500108 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700109 return; by default all available frames are returned. Its
110 meaning differs depending on whether a stack or a traceback is
111 returned: the newest frames of a stack are returned, but the
112 oldest frames of a traceback are returned. (This matches the
113 behavior of the traceback module.)
114
115 For reasons beyond our control, only one stack frame is
116 returned for a suspended coroutine.
117 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400118 return base_tasks._task_get_stack(self, limit)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700119
120 def print_stack(self, *, limit=None, file=None):
121 """Print the stack or traceback for this task's coroutine.
122
123 This produces output similar to that of the traceback module,
124 for the frames retrieved by get_stack(). The limit argument
125 is passed to get_stack(). The file argument is an I/O stream
R David Murray8e069d52014-09-24 13:13:45 -0400126 to which the output is written; by default output is written
127 to sys.stderr.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700128 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400129 return base_tasks._task_print_stack(self, limit, file)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700130
131 def cancel(self):
R David Murray8e069d52014-09-24 13:13:45 -0400132 """Request that this task cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200133
Victor Stinner8d213572014-06-02 23:06:46 +0200134 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200135 wrapped coroutine on the next cycle through the event loop.
136 The coroutine then has a chance to clean up or even deny
137 the request using try/except/finally.
138
R David Murray8e069d52014-09-24 13:13:45 -0400139 Unlike Future.cancel, this does not guarantee that the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200140 task will be cancelled: the exception might be caught and
R David Murray8e069d52014-09-24 13:13:45 -0400141 acted upon, delaying cancellation of the task or preventing
142 cancellation completely. The task may also return a value or
143 raise a different exception.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200144
145 Immediately after this method is called, Task.cancelled() will
146 not return True (unless the task was already cancelled). A
147 task will be marked as cancelled when the wrapped coroutine
148 terminates with a CancelledError exception (even if cancel()
149 was not called).
150 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700151 if self.done():
152 return False
153 if self._fut_waiter is not None:
154 if self._fut_waiter.cancel():
155 # Leave self._fut_waiter; it may be a Task that
156 # catches and ignores the cancellation so we may have
157 # to cancel it again later.
158 return True
159 # It must be the case that self._step is already scheduled.
160 self._must_cancel = True
161 return True
162
Yury Selivanovd59bba82015-11-20 12:41:03 -0500163 def _step(self, exc=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700164 assert not self.done(), \
Yury Selivanovd59bba82015-11-20 12:41:03 -0500165 '_step(): already done: {!r}, {!r}'.format(self, exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700166 if self._must_cancel:
167 if not isinstance(exc, futures.CancelledError):
168 exc = futures.CancelledError()
169 self._must_cancel = False
170 coro = self._coro
171 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800172
173 self.__class__._current_tasks[self._loop] = self
Yury Selivanovd59bba82015-11-20 12:41:03 -0500174 # Call either coro.throw(exc) or coro.send(None).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700175 try:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500176 if exc is None:
177 # We use the `send` method directly, because coroutines
178 # don't have `__iter__` and `__next__` methods.
179 result = coro.send(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700180 else:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500181 result = coro.throw(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700182 except StopIteration as exc:
183 self.set_result(exc.value)
Yury Selivanov4145c832016-10-09 12:19:12 -0400184 except futures.CancelledError:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700185 super().cancel() # I.e., Future.cancel(self).
186 except Exception as exc:
187 self.set_exception(exc)
188 except BaseException as exc:
189 self.set_exception(exc)
190 raise
191 else:
Guido van Rossum1140a032016-09-09 12:54:54 -0700192 blocking = getattr(result, '_asyncio_future_blocking', None)
193 if blocking is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700194 # Yielded Future must come from Future.__iter__().
Yury Selivanov0ac3a0c2015-12-11 11:33:59 -0500195 if result._loop is not self._loop:
196 self._loop.call_soon(
197 self._step,
198 RuntimeError(
199 'Task {!r} got Future {!r} attached to a '
200 'different loop'.format(self, result)))
Guido van Rossum1140a032016-09-09 12:54:54 -0700201 elif blocking:
Yury Selivanov4145c832016-10-09 12:19:12 -0400202 if result is self:
203 self._loop.call_soon(
204 self._step,
205 RuntimeError(
206 'Task cannot await on itself: {!r}'.format(
207 self)))
208 else:
209 result._asyncio_future_blocking = False
210 result.add_done_callback(self._wakeup)
211 self._fut_waiter = result
212 if self._must_cancel:
213 if self._fut_waiter.cancel():
214 self._must_cancel = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700215 else:
216 self._loop.call_soon(
Yury Selivanovd59bba82015-11-20 12:41:03 -0500217 self._step,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700218 RuntimeError(
219 'yield was used instead of yield from '
220 'in task {!r} with {!r}'.format(self, result)))
221 elif result is None:
222 # Bare yield relinquishes control for one event loop iteration.
223 self._loop.call_soon(self._step)
224 elif inspect.isgenerator(result):
225 # Yielding a generator is just wrong.
226 self._loop.call_soon(
Yury Selivanovd59bba82015-11-20 12:41:03 -0500227 self._step,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700228 RuntimeError(
229 'yield was used instead of yield from for '
230 'generator in task {!r} with {}'.format(
231 self, result)))
232 else:
233 # Yielding something else is an error.
234 self._loop.call_soon(
Yury Selivanovd59bba82015-11-20 12:41:03 -0500235 self._step,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700236 RuntimeError(
237 'Task got bad yield: {!r}'.format(result)))
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800238 finally:
239 self.__class__._current_tasks.pop(self._loop)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100240 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700241
242 def _wakeup(self, future):
243 try:
Yury Selivanova4afc482015-11-16 15:12:10 -0500244 future.result()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700245 except Exception as exc:
246 # This may also be a cancellation.
Yury Selivanovd59bba82015-11-20 12:41:03 -0500247 self._step(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700248 else:
Yury Selivanova4afc482015-11-16 15:12:10 -0500249 # Don't pass the value of `future.result()` explicitly,
250 # as `Future.__iter__` and `Future.__await__` don't need it.
251 # If we call `_step(value, None)` instead of `_step()`,
252 # Python eval loop would use `.send(value)` method call,
253 # instead of `__next__()`, which is slower for futures
254 # that return non-generator iterators from their `__iter__`.
255 self._step()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700256 self = None # Needed to break cycles when an exception occurs.
257
258
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400259_PyTask = Task
260
261
262try:
263 import _asyncio
264except ImportError:
265 pass
266else:
267 # _CTask is needed for tests.
268 Task = _CTask = _asyncio.Task
269
270
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700271# wait() and as_completed() similar to those in PEP 3148.
272
273FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
274FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
275ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
276
277
278@coroutine
279def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
280 """Wait for the Futures and coroutines given by fs to complete.
281
Victor Stinnerdb74d982014-06-10 11:16:05 +0200282 The sequence futures must not be empty.
283
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700284 Coroutines will be wrapped in Tasks.
285
286 Returns two sets of Future: (done, pending).
287
288 Usage:
289
290 done, pending = yield from asyncio.wait(fs)
291
292 Note: This does not raise TimeoutError! Futures that aren't done
293 when the timeout occurs are returned in the second set.
294 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700295 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100296 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700297 if not fs:
298 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200299 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
300 raise ValueError('Invalid return_when value: {}'.format(return_when))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700301
302 if loop is None:
303 loop = events.get_event_loop()
304
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400305 fs = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700306
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700307 return (yield from _wait(fs, timeout, return_when, loop))
308
309
Victor Stinner59e08022014-08-28 11:19:25 +0200310def _release_waiter(waiter, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700311 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200312 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700313
314
315@coroutine
316def wait_for(fut, timeout, *, loop=None):
317 """Wait for the single Future or coroutine to complete, with timeout.
318
319 Coroutine will be wrapped in Task.
320
Victor Stinner421e49b2014-01-23 17:40:59 +0100321 Returns result of the Future or coroutine. When a timeout occurs,
322 it cancels the task and raises TimeoutError. To avoid the task
323 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700324
Victor Stinner922bc2c2015-01-15 16:29:10 +0100325 If the wait is cancelled, the task is also cancelled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700326
Victor Stinner922bc2c2015-01-15 16:29:10 +0100327 This function is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700328 """
329 if loop is None:
330 loop = events.get_event_loop()
331
Guido van Rossum48c66c32014-01-29 14:30:38 -0800332 if timeout is None:
333 return (yield from fut)
334
Yury Selivanov7661db62016-05-16 15:38:39 -0400335 waiter = loop.create_future()
Victor Stinner59e08022014-08-28 11:19:25 +0200336 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
337 cb = functools.partial(_release_waiter, waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700338
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400339 fut = ensure_future(fut, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700340 fut.add_done_callback(cb)
341
342 try:
Victor Stinner59e08022014-08-28 11:19:25 +0200343 # wait until the future completes or the timeout
Victor Stinner922bc2c2015-01-15 16:29:10 +0100344 try:
345 yield from waiter
346 except futures.CancelledError:
347 fut.remove_done_callback(cb)
348 fut.cancel()
349 raise
Victor Stinner59e08022014-08-28 11:19:25 +0200350
351 if fut.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700352 return fut.result()
353 else:
354 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100355 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700356 raise futures.TimeoutError()
357 finally:
358 timeout_handle.cancel()
359
360
361@coroutine
362def _wait(fs, timeout, return_when, loop):
Victor Stinner2ba8ece2016-04-01 21:39:09 +0200363 """Internal helper for wait() and wait_for().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700364
365 The fs argument must be a collection of Futures.
366 """
367 assert fs, 'Set of Futures is empty.'
Yury Selivanov7661db62016-05-16 15:38:39 -0400368 waiter = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700369 timeout_handle = None
370 if timeout is not None:
371 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
372 counter = len(fs)
373
374 def _on_completion(f):
375 nonlocal counter
376 counter -= 1
377 if (counter <= 0 or
378 return_when == FIRST_COMPLETED or
379 return_when == FIRST_EXCEPTION and (not f.cancelled() and
380 f.exception() is not None)):
381 if timeout_handle is not None:
382 timeout_handle.cancel()
383 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200384 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700385
386 for f in fs:
387 f.add_done_callback(_on_completion)
388
389 try:
390 yield from waiter
391 finally:
392 if timeout_handle is not None:
393 timeout_handle.cancel()
394
395 done, pending = set(), set()
396 for f in fs:
397 f.remove_done_callback(_on_completion)
398 if f.done():
399 done.add(f)
400 else:
401 pending.add(f)
402 return done, pending
403
404
405# This is *not* a @coroutine! It is just an iterator (yielding Futures).
406def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800407 """Return an iterator whose values are coroutines.
408
409 When waiting for the yielded coroutines you'll get the results (or
410 exceptions!) of the original Futures (or coroutines), in the order
411 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700412
413 This differs from PEP 3148; the proper way to use this is:
414
415 for f in as_completed(fs):
416 result = yield from f # The 'yield from' may raise.
417 # Use result.
418
Guido van Rossumb58f0532014-02-12 17:58:19 -0800419 If a timeout is specified, the 'yield from' will raise
420 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700421
422 Note: The futures 'f' are not necessarily members of fs.
423 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700424 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100425 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700426 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400427 todo = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800428 from .queues import Queue # Import here to avoid circular import problem.
429 done = Queue(loop=loop)
430 timeout_handle = None
431
432 def _on_timeout():
433 for f in todo:
434 f.remove_done_callback(_on_completion)
435 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
436 todo.clear() # Can't do todo.remove(f) in the loop.
437
438 def _on_completion(f):
439 if not todo:
440 return # _on_timeout() was here first.
441 todo.remove(f)
442 done.put_nowait(f)
443 if not todo and timeout_handle is not None:
444 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700445
446 @coroutine
447 def _wait_for_one():
Guido van Rossumb58f0532014-02-12 17:58:19 -0800448 f = yield from done.get()
449 if f is None:
450 # Dummy value from _on_timeout().
451 raise futures.TimeoutError
452 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700453
Guido van Rossumb58f0532014-02-12 17:58:19 -0800454 for f in todo:
455 f.add_done_callback(_on_completion)
456 if todo and timeout is not None:
457 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700458 for _ in range(len(todo)):
459 yield _wait_for_one()
460
461
462@coroutine
463def sleep(delay, result=None, *, loop=None):
464 """Coroutine that completes after a given time (in seconds)."""
Yury Selivanovade04122015-11-05 14:29:04 -0500465 if delay == 0:
466 yield
467 return result
468
Yury Selivanov7661db62016-05-16 15:38:39 -0400469 if loop is None:
470 loop = events.get_event_loop()
471 future = loop.create_future()
Victor Stinnera9acbe82014-07-05 15:29:41 +0200472 h = future._loop.call_later(delay,
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500473 futures._set_result_unless_cancelled,
474 future, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700475 try:
476 return (yield from future)
477 finally:
478 h.cancel()
479
480
Yury Selivanov4357cf62016-09-15 13:49:08 -0400481def async_(coro_or_future, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700482 """Wrap a coroutine in a future.
483
484 If the argument is a Future, it is returned directly.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400485
486 This function is deprecated in 3.5. Use asyncio.ensure_future() instead.
487 """
488
489 warnings.warn("asyncio.async() function is deprecated, use ensure_future()",
Mariatta Wijaya4e7ff8b2017-02-06 22:03:00 -0800490 DeprecationWarning,
491 stacklevel=2)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400492
493 return ensure_future(coro_or_future, loop=loop)
494
Yury Selivanov4357cf62016-09-15 13:49:08 -0400495# Silence DeprecationWarning:
496globals()['async'] = async_
497async_.__name__ = 'async'
498del async_
499
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400500
501def ensure_future(coro_or_future, *, loop=None):
Yury Selivanov620279b2015-10-02 15:00:19 -0400502 """Wrap a coroutine or an awaitable in a future.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400503
504 If the argument is a Future, it is returned directly.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700505 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700506 if futures.isfuture(coro_or_future):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700507 if loop is not None and loop is not coro_or_future._loop:
508 raise ValueError('loop argument must agree with Future')
509 return coro_or_future
Victor Stinnerf951d282014-06-29 00:46:45 +0200510 elif coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200511 if loop is None:
512 loop = events.get_event_loop()
513 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200514 if task._source_traceback:
515 del task._source_traceback[-1]
516 return task
Yury Selivanov620279b2015-10-02 15:00:19 -0400517 elif compat.PY35 and inspect.isawaitable(coro_or_future):
518 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700519 else:
Yury Selivanov620279b2015-10-02 15:00:19 -0400520 raise TypeError('A Future, a coroutine or an awaitable is required')
521
522
523@coroutine
524def _wrap_awaitable(awaitable):
525 """Helper for asyncio.ensure_future().
526
527 Wraps awaitable (an object with __await__) into a coroutine
528 that will later be wrapped in a Task by ensure_future().
529 """
530 return (yield from awaitable.__await__())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700531
532
533class _GatheringFuture(futures.Future):
534 """Helper for gather().
535
536 This overrides cancel() to cancel all the children and act more
537 like Task.cancel(), which doesn't immediately mark itself as
538 cancelled.
539 """
540
541 def __init__(self, children, *, loop=None):
542 super().__init__(loop=loop)
543 self._children = children
544
545 def cancel(self):
546 if self.done():
547 return False
Yury Selivanov3d676152016-10-21 17:22:17 -0400548 ret = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700549 for child in self._children:
Yury Selivanov3d676152016-10-21 17:22:17 -0400550 if child.cancel():
551 ret = True
552 return ret
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700553
554
555def gather(*coros_or_futures, loop=None, return_exceptions=False):
556 """Return a future aggregating results from the given coroutines
557 or futures.
558
Guido van Rossume3c65a72016-09-30 08:17:15 -0700559 Coroutines will be wrapped in a future and scheduled in the event
560 loop. They will not necessarily be scheduled in the same order as
561 passed in.
562
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700563 All futures must share the same event loop. If all the tasks are
564 done successfully, the returned future's result is the list of
565 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500566 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700567 exceptions in the tasks are treated the same as successful
568 results, and gathered in the result list; otherwise, the first
569 raised exception will be immediately propagated to the returned
570 future.
571
572 Cancellation: if the outer Future is cancelled, all children (that
573 have not completed yet) are also cancelled. If any child is
574 cancelled, this is treated as if it raised CancelledError --
575 the outer Future is *not* cancelled in this case. (This is to
576 prevent the cancellation of one child to cause other children to
577 be cancelled.)
578 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200579 if not coros_or_futures:
Yury Selivanov7661db62016-05-16 15:38:39 -0400580 if loop is None:
581 loop = events.get_event_loop()
582 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700583 outer.set_result([])
584 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200585
586 arg_to_fut = {}
587 for arg in set(coros_or_futures):
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700588 if not futures.isfuture(arg):
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400589 fut = ensure_future(arg, loop=loop)
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200590 if loop is None:
591 loop = fut._loop
592 # The caller cannot control this future, the "destroy pending task"
593 # warning should not be emitted.
594 fut._log_destroy_pending = False
595 else:
596 fut = arg
597 if loop is None:
598 loop = fut._loop
599 elif fut._loop is not loop:
600 raise ValueError("futures are tied to different event loops")
601 arg_to_fut[arg] = fut
602
603 children = [arg_to_fut[arg] for arg in coros_or_futures]
604 nchildren = len(children)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700605 outer = _GatheringFuture(children, loop=loop)
606 nfinished = 0
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200607 results = [None] * nchildren
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700608
609 def _done_callback(i, fut):
610 nonlocal nfinished
Victor Stinner3531d902015-01-09 01:42:52 +0100611 if outer.done():
612 if not fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700613 # Mark exception retrieved.
614 fut.exception()
615 return
Victor Stinner3531d902015-01-09 01:42:52 +0100616
Victor Stinner29342622015-01-29 14:15:19 +0100617 if fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700618 res = futures.CancelledError()
619 if not return_exceptions:
620 outer.set_exception(res)
621 return
622 elif fut._exception is not None:
623 res = fut.exception() # Mark exception retrieved.
624 if not return_exceptions:
625 outer.set_exception(res)
626 return
627 else:
628 res = fut._result
629 results[i] = res
630 nfinished += 1
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200631 if nfinished == nchildren:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700632 outer.set_result(results)
633
634 for i, fut in enumerate(children):
635 fut.add_done_callback(functools.partial(_done_callback, i))
636 return outer
637
638
639def shield(arg, *, loop=None):
640 """Wait for a future, shielding it from cancellation.
641
642 The statement
643
644 res = yield from shield(something())
645
646 is exactly equivalent to the statement
647
648 res = yield from something()
649
650 *except* that if the coroutine containing it is cancelled, the
651 task running in something() is not cancelled. From the POV of
652 something(), the cancellation did not happen. But its caller is
653 still cancelled, so the yield-from expression still raises
654 CancelledError. Note: If something() is cancelled by other means
655 this will still cancel shield().
656
657 If you want to completely ignore cancellation (not recommended)
658 you can combine shield() with a try/except clause, as follows:
659
660 try:
661 res = yield from shield(something())
662 except CancelledError:
663 res = None
664 """
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400665 inner = ensure_future(arg, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700666 if inner.done():
667 # Shortcut.
668 return inner
669 loop = inner._loop
Yury Selivanov7661db62016-05-16 15:38:39 -0400670 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700671
672 def _done_callback(inner):
673 if outer.cancelled():
Victor Stinner3531d902015-01-09 01:42:52 +0100674 if not inner.cancelled():
675 # Mark inner's result as retrieved.
676 inner.exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700677 return
Victor Stinner3531d902015-01-09 01:42:52 +0100678
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700679 if inner.cancelled():
680 outer.cancel()
681 else:
682 exc = inner.exception()
683 if exc is not None:
684 outer.set_exception(exc)
685 else:
686 outer.set_result(inner.result())
687
688 inner.add_done_callback(_done_callback)
689 return outer
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700690
691
692def run_coroutine_threadsafe(coro, loop):
693 """Submit a coroutine object to a given event loop.
694
695 Return a concurrent.futures.Future to access the result.
696 """
697 if not coroutines.iscoroutine(coro):
698 raise TypeError('A coroutine object is required')
699 future = concurrent.futures.Future()
700
701 def callback():
Guido van Rossum601953b2015-10-05 16:20:00 -0700702 try:
703 futures._chain_future(ensure_future(coro, loop=loop), future)
704 except Exception as exc:
705 if future.set_running_or_notify_cancel():
706 future.set_exception(exc)
707 raise
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700708
709 loop.call_soon_threadsafe(callback)
710 return future