blob: 78e76003b3ac220db5ef2ab2168148e77d374176 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Yury Selivanov6370f342017-12-10 18:36:12 -05003__all__ = (
Andrew Svetlovf74ef452017-12-15 07:04:38 +02004 'Task', 'create_task',
Yury Selivanov6370f342017-12-10 18:36:12 -05005 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
Yury Selivanov9edad3c2017-12-11 10:03:48 -05006 'wait', 'wait_for', 'as_completed', 'sleep',
Yury Selivanov6370f342017-12-10 18:36:12 -05007 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
Andrew Svetlov44d1a592017-12-16 21:58:38 +02008 'current_task', 'all_tasks',
9 '_register_task', '_unregister_task', '_enter_task', '_leave_task',
Yury Selivanov6370f342017-12-10 18:36:12 -050010)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012import concurrent.futures
Yury Selivanovf23746a2018-01-22 19:11:18 -050013import contextvars
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014import functools
15import inspect
Alex Grönholmcca4eec2018-08-09 00:06:47 +030016import itertools
Andrew Svetlov5f841b52017-12-09 00:23:48 +020017import types
Yury Selivanov59eb9a42015-05-11 14:48:38 -040018import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019import weakref
20
Yury Selivanova0c1ba62016-10-28 12:52:37 -040021from . import base_tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020022from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070024from . import exceptions
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025from . import futures
Andrew Svetlov68b34a72019-05-16 17:52:10 +030026from .coroutines import _is_coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027
Alex Grönholmcca4eec2018-08-09 00:06:47 +030028# Helper to generate new task names
29# This uses itertools.count() instead of a "+= 1" operation because the latter
30# is not thread safe. See bpo-11866 for a longer explanation.
31_task_name_counter = itertools.count(1).__next__
32
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070033
Andrew Svetlov44d1a592017-12-16 21:58:38 +020034def current_task(loop=None):
35 """Return a currently executed task."""
36 if loop is None:
37 loop = events.get_running_loop()
38 return _current_tasks.get(loop)
39
40
41def all_tasks(loop=None):
42 """Return a set of all tasks for the loop."""
43 if loop is None:
Yury Selivanov416c1eb2018-05-28 17:54:02 -040044 loop = events.get_running_loop()
Andrew Svetlov97cf0822018-10-13 21:12:40 +030045 # NB: set(_all_tasks) is required to protect
46 # from https://bugs.python.org/issue34970 bug
47 return {t for t in list(_all_tasks)
Yury Selivanov416c1eb2018-05-28 17:54:02 -040048 if futures._get_loop(t) is loop and not t.done()}
49
50
51def _all_tasks_compat(loop=None):
52 # Different from "all_task()" by returning *all* Tasks, including
53 # the completed ones. Used to implement deprecated "Tasks.all_task()"
54 # method.
55 if loop is None:
Andrew Svetlov44d1a592017-12-16 21:58:38 +020056 loop = events.get_event_loop()
Andrew Svetlov97cf0822018-10-13 21:12:40 +030057 # NB: set(_all_tasks) is required to protect
58 # from https://bugs.python.org/issue34970 bug
59 return {t for t in list(_all_tasks) if futures._get_loop(t) is loop}
Andrew Svetlov44d1a592017-12-16 21:58:38 +020060
61
Alex Grönholmcca4eec2018-08-09 00:06:47 +030062def _set_task_name(task, name):
63 if name is not None:
64 try:
65 set_name = task.set_name
66 except AttributeError:
67 pass
68 else:
69 set_name(name)
70
71
Yury Selivanov0cf16f92017-12-25 10:48:15 -050072class Task(futures._PyFuture): # Inherit Python Task implementation
73 # from a Python Future implementation.
74
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070075 """A coroutine wrapped in a Future."""
76
77 # An important invariant maintained while a Task not done:
78 #
79 # - Either _fut_waiter is None, and _step() is scheduled;
80 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
81 #
82 # The only transition from the latter to the former is through
83 # _wakeup(). When _fut_waiter is not None, one of its callbacks
84 # must be _wakeup().
85
Victor Stinnerfe22e092014-12-04 23:00:13 +010086 # If False, don't log a message if the task is destroyed whereas its
87 # status is still pending
88 _log_destroy_pending = True
89
Guido van Rossum1a605ed2013-12-06 12:57:40 -080090 @classmethod
91 def current_task(cls, loop=None):
92 """Return the currently running task in an event loop or None.
93
94 By default the current task for the current event loop is returned.
95
96 None is returned when called not in the context of a Task.
97 """
Matthias Bussonnierd0ebf132019-05-20 23:20:10 -070098 warnings.warn("Task.current_task() is deprecated since Python 3.7, "
Andrew Svetlov44d1a592017-12-16 21:58:38 +020099 "use asyncio.current_task() instead",
Inada Naokic5c6cda2019-03-22 20:07:32 +0900100 DeprecationWarning,
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200101 stacklevel=2)
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800102 if loop is None:
103 loop = events.get_event_loop()
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200104 return current_task(loop)
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800105
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700106 @classmethod
107 def all_tasks(cls, loop=None):
108 """Return a set of all tasks for an event loop.
109
110 By default all tasks for the current event loop are returned.
111 """
Matthias Bussonnierd0ebf132019-05-20 23:20:10 -0700112 warnings.warn("Task.all_tasks() is deprecated since Python 3.7, "
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200113 "use asyncio.all_tasks() instead",
Inada Naokic5c6cda2019-03-22 20:07:32 +0900114 DeprecationWarning,
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200115 stacklevel=2)
Yury Selivanov416c1eb2018-05-28 17:54:02 -0400116 return _all_tasks_compat(loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700117
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300118 def __init__(self, coro, *, loop=None, name=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700119 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200120 if self._source_traceback:
121 del self._source_traceback[-1]
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200122 if not coroutines.iscoroutine(coro):
123 # raise after Future.__init__(), attrs are required for __del__
124 # prevent logging for pending task in __del__
125 self._log_destroy_pending = False
126 raise TypeError(f"a coroutine was expected, got {coro!r}")
127
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300128 if name is None:
129 self._name = f'Task-{_task_name_counter()}'
130 else:
131 self._name = str(name)
132
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700133 self._must_cancel = False
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200134 self._fut_waiter = None
135 self._coro = coro
Yury Selivanovf23746a2018-01-22 19:11:18 -0500136 self._context = contextvars.copy_context()
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200137
Yury Selivanov22feeb82018-01-24 11:31:01 -0500138 self._loop.call_soon(self.__step, context=self._context)
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500139 _register_task(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700140
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900141 def __del__(self):
142 if self._state == futures._PENDING and self._log_destroy_pending:
143 context = {
144 'task': self,
145 'message': 'Task was destroyed but it is pending!',
146 }
147 if self._source_traceback:
148 context['source_traceback'] = self._source_traceback
149 self._loop.call_exception_handler(context)
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500150 super().__del__()
Victor Stinnera02f81f2014-06-24 22:37:53 +0200151
Victor Stinner313a9802014-07-29 12:58:23 +0200152 def _repr_info(self):
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400153 return base_tasks._task_repr_info(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700154
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300155 def get_name(self):
156 return self._name
157
158 def set_name(self, value):
159 self._name = str(value)
160
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500161 def set_result(self, result):
162 raise RuntimeError('Task does not support set_result operation')
163
164 def set_exception(self, exception):
165 raise RuntimeError('Task does not support set_exception operation')
166
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700167 def get_stack(self, *, limit=None):
168 """Return the list of stack frames for this task's coroutine.
169
Victor Stinnerd87de832014-12-02 17:57:04 +0100170 If the coroutine is not done, this returns the stack where it is
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700171 suspended. If the coroutine has completed successfully or was
172 cancelled, this returns an empty list. If the coroutine was
173 terminated by an exception, this returns the list of traceback
174 frames.
175
176 The frames are always ordered from oldest to newest.
177
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500178 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700179 return; by default all available frames are returned. Its
180 meaning differs depending on whether a stack or a traceback is
181 returned: the newest frames of a stack are returned, but the
182 oldest frames of a traceback are returned. (This matches the
183 behavior of the traceback module.)
184
185 For reasons beyond our control, only one stack frame is
186 returned for a suspended coroutine.
187 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400188 return base_tasks._task_get_stack(self, limit)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700189
190 def print_stack(self, *, limit=None, file=None):
191 """Print the stack or traceback for this task's coroutine.
192
193 This produces output similar to that of the traceback module,
194 for the frames retrieved by get_stack(). The limit argument
195 is passed to get_stack(). The file argument is an I/O stream
R David Murray8e069d52014-09-24 13:13:45 -0400196 to which the output is written; by default output is written
197 to sys.stderr.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700198 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400199 return base_tasks._task_print_stack(self, limit, file)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700200
201 def cancel(self):
R David Murray8e069d52014-09-24 13:13:45 -0400202 """Request that this task cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200203
Victor Stinner8d213572014-06-02 23:06:46 +0200204 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200205 wrapped coroutine on the next cycle through the event loop.
206 The coroutine then has a chance to clean up or even deny
207 the request using try/except/finally.
208
R David Murray8e069d52014-09-24 13:13:45 -0400209 Unlike Future.cancel, this does not guarantee that the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200210 task will be cancelled: the exception might be caught and
R David Murray8e069d52014-09-24 13:13:45 -0400211 acted upon, delaying cancellation of the task or preventing
212 cancellation completely. The task may also return a value or
213 raise a different exception.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200214
215 Immediately after this method is called, Task.cancelled() will
216 not return True (unless the task was already cancelled). A
217 task will be marked as cancelled when the wrapped coroutine
218 terminates with a CancelledError exception (even if cancel()
219 was not called).
220 """
Yury Selivanov7ce1c6f2017-06-11 13:49:18 +0000221 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700222 if self.done():
223 return False
224 if self._fut_waiter is not None:
225 if self._fut_waiter.cancel():
226 # Leave self._fut_waiter; it may be a Task that
227 # catches and ignores the cancellation so we may have
228 # to cancel it again later.
229 return True
Yury Selivanov22feeb82018-01-24 11:31:01 -0500230 # It must be the case that self.__step is already scheduled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700231 self._must_cancel = True
232 return True
233
Yury Selivanov22feeb82018-01-24 11:31:01 -0500234 def __step(self, exc=None):
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500235 if self.done():
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700236 raise exceptions.InvalidStateError(
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500237 f'_step(): already done: {self!r}, {exc!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700238 if self._must_cancel:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700239 if not isinstance(exc, exceptions.CancelledError):
240 exc = exceptions.CancelledError()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700241 self._must_cancel = False
242 coro = self._coro
243 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800244
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200245 _enter_task(self._loop, self)
Yury Selivanovd59bba82015-11-20 12:41:03 -0500246 # Call either coro.throw(exc) or coro.send(None).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700247 try:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500248 if exc is None:
249 # We use the `send` method directly, because coroutines
250 # don't have `__iter__` and `__next__` methods.
251 result = coro.send(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700252 else:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500253 result = coro.throw(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700254 except StopIteration as exc:
INADA Naoki991adca2017-05-11 21:18:38 +0900255 if self._must_cancel:
256 # Task is cancelled right before coro stops.
257 self._must_cancel = False
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700258 super().set_exception(exceptions.CancelledError())
INADA Naoki991adca2017-05-11 21:18:38 +0900259 else:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500260 super().set_result(exc.value)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700261 except exceptions.CancelledError:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700262 super().cancel() # I.e., Future.cancel(self).
Yury Selivanov431b5402019-05-27 14:45:12 +0200263 except (KeyboardInterrupt, SystemExit) as exc:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500264 super().set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700265 raise
Yury Selivanov431b5402019-05-27 14:45:12 +0200266 except BaseException as exc:
267 super().set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700268 else:
Guido van Rossum1140a032016-09-09 12:54:54 -0700269 blocking = getattr(result, '_asyncio_future_blocking', None)
270 if blocking is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700271 # Yielded Future must come from Future.__iter__().
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500272 if futures._get_loop(result) is not self._loop:
Yury Selivanov6370f342017-12-10 18:36:12 -0500273 new_exc = RuntimeError(
274 f'Task {self!r} got Future '
275 f'{result!r} attached to a different loop')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500276 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500277 self.__step, new_exc, context=self._context)
Guido van Rossum1140a032016-09-09 12:54:54 -0700278 elif blocking:
Yury Selivanov4145c832016-10-09 12:19:12 -0400279 if result is self:
Yury Selivanov6370f342017-12-10 18:36:12 -0500280 new_exc = RuntimeError(
281 f'Task cannot await on itself: {self!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500282 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500283 self.__step, new_exc, context=self._context)
Yury Selivanov4145c832016-10-09 12:19:12 -0400284 else:
285 result._asyncio_future_blocking = False
Yury Selivanovf23746a2018-01-22 19:11:18 -0500286 result.add_done_callback(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500287 self.__wakeup, context=self._context)
Yury Selivanov4145c832016-10-09 12:19:12 -0400288 self._fut_waiter = result
289 if self._must_cancel:
290 if self._fut_waiter.cancel():
291 self._must_cancel = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700292 else:
Yury Selivanov6370f342017-12-10 18:36:12 -0500293 new_exc = RuntimeError(
294 f'yield was used instead of yield from '
295 f'in task {self!r} with {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500296 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500297 self.__step, new_exc, context=self._context)
Yury Selivanov6370f342017-12-10 18:36:12 -0500298
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700299 elif result is None:
300 # Bare yield relinquishes control for one event loop iteration.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500301 self._loop.call_soon(self.__step, context=self._context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700302 elif inspect.isgenerator(result):
303 # Yielding a generator is just wrong.
Yury Selivanov6370f342017-12-10 18:36:12 -0500304 new_exc = RuntimeError(
305 f'yield was used instead of yield from for '
Serhiy Storchaka66553542018-05-20 16:30:31 +0300306 f'generator in task {self!r} with {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500307 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500308 self.__step, new_exc, context=self._context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700309 else:
310 # Yielding something else is an error.
Yury Selivanov6370f342017-12-10 18:36:12 -0500311 new_exc = RuntimeError(f'Task got bad yield: {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500312 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500313 self.__step, new_exc, context=self._context)
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800314 finally:
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200315 _leave_task(self._loop, self)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100316 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700317
Yury Selivanov22feeb82018-01-24 11:31:01 -0500318 def __wakeup(self, future):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700319 try:
Yury Selivanova4afc482015-11-16 15:12:10 -0500320 future.result()
Yury Selivanov431b5402019-05-27 14:45:12 +0200321 except BaseException as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700322 # This may also be a cancellation.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500323 self.__step(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700324 else:
Yury Selivanova4afc482015-11-16 15:12:10 -0500325 # Don't pass the value of `future.result()` explicitly,
326 # as `Future.__iter__` and `Future.__await__` don't need it.
327 # If we call `_step(value, None)` instead of `_step()`,
328 # Python eval loop would use `.send(value)` method call,
329 # instead of `__next__()`, which is slower for futures
330 # that return non-generator iterators from their `__iter__`.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500331 self.__step()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700332 self = None # Needed to break cycles when an exception occurs.
333
334
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400335_PyTask = Task
336
337
338try:
339 import _asyncio
340except ImportError:
341 pass
342else:
343 # _CTask is needed for tests.
344 Task = _CTask = _asyncio.Task
345
346
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300347def create_task(coro, *, name=None):
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200348 """Schedule the execution of a coroutine object in a spawn task.
349
350 Return a Task object.
351 """
352 loop = events.get_running_loop()
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300353 task = loop.create_task(coro)
354 _set_task_name(task, name)
355 return task
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200356
357
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700358# wait() and as_completed() similar to those in PEP 3148.
359
360FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
361FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
362ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
363
364
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200365async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700366 """Wait for the Futures and coroutines given by fs to complete.
367
Victor Stinnerdb74d982014-06-10 11:16:05 +0200368 The sequence futures must not be empty.
369
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700370 Coroutines will be wrapped in Tasks.
371
372 Returns two sets of Future: (done, pending).
373
374 Usage:
375
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200376 done, pending = await asyncio.wait(fs)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700377
378 Note: This does not raise TimeoutError! Futures that aren't done
379 when the timeout occurs are returned in the second set.
380 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700381 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500382 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700383 if not fs:
384 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200385 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
Yury Selivanov6370f342017-12-10 18:36:12 -0500386 raise ValueError(f'Invalid return_when value: {return_when}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700387
388 if loop is None:
João Júnior558c49b2018-09-24 06:51:22 -0300389 loop = events.get_running_loop()
390 else:
Matthias Bussonnierd0ebf132019-05-20 23:20:10 -0700391 warnings.warn("The loop argument is deprecated since Python 3.8, "
392 "and scheduled for removal in Python 3.10.",
João Júnior558c49b2018-09-24 06:51:22 -0300393 DeprecationWarning, stacklevel=2)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700394
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400395 fs = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700396
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200397 return await _wait(fs, timeout, return_when, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700398
399
Victor Stinner59e08022014-08-28 11:19:25 +0200400def _release_waiter(waiter, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700401 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200402 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700403
404
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200405async def wait_for(fut, timeout, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700406 """Wait for the single Future or coroutine to complete, with timeout.
407
408 Coroutine will be wrapped in Task.
409
Victor Stinner421e49b2014-01-23 17:40:59 +0100410 Returns result of the Future or coroutine. When a timeout occurs,
411 it cancels the task and raises TimeoutError. To avoid the task
412 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700413
Victor Stinner922bc2c2015-01-15 16:29:10 +0100414 If the wait is cancelled, the task is also cancelled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700415
Victor Stinner922bc2c2015-01-15 16:29:10 +0100416 This function is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700417 """
418 if loop is None:
João Júnior558c49b2018-09-24 06:51:22 -0300419 loop = events.get_running_loop()
420 else:
Matthias Bussonnierd0ebf132019-05-20 23:20:10 -0700421 warnings.warn("The loop argument is deprecated since Python 3.8, "
422 "and scheduled for removal in Python 3.10.",
João Júnior558c49b2018-09-24 06:51:22 -0300423 DeprecationWarning, stacklevel=2)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700424
Guido van Rossum48c66c32014-01-29 14:30:38 -0800425 if timeout is None:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200426 return await fut
Guido van Rossum48c66c32014-01-29 14:30:38 -0800427
Victor K4d071892017-10-05 19:04:39 +0300428 if timeout <= 0:
429 fut = ensure_future(fut, loop=loop)
430
431 if fut.done():
432 return fut.result()
433
434 fut.cancel()
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700435 raise exceptions.TimeoutError()
Victor K4d071892017-10-05 19:04:39 +0300436
Yury Selivanov7661db62016-05-16 15:38:39 -0400437 waiter = loop.create_future()
Victor Stinner59e08022014-08-28 11:19:25 +0200438 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
439 cb = functools.partial(_release_waiter, waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700440
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400441 fut = ensure_future(fut, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700442 fut.add_done_callback(cb)
443
444 try:
Victor Stinner59e08022014-08-28 11:19:25 +0200445 # wait until the future completes or the timeout
Victor Stinner922bc2c2015-01-15 16:29:10 +0100446 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200447 await waiter
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700448 except exceptions.CancelledError:
Victor Stinner922bc2c2015-01-15 16:29:10 +0100449 fut.remove_done_callback(cb)
450 fut.cancel()
451 raise
Victor Stinner59e08022014-08-28 11:19:25 +0200452
453 if fut.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700454 return fut.result()
455 else:
456 fut.remove_done_callback(cb)
Elvis Pranskevichuse2b340a2018-05-29 17:31:01 -0400457 # We must ensure that the task is not running
458 # after wait_for() returns.
459 # See https://bugs.python.org/issue32751
460 await _cancel_and_wait(fut, loop=loop)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700461 raise exceptions.TimeoutError()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700462 finally:
463 timeout_handle.cancel()
464
465
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200466async def _wait(fs, timeout, return_when, loop):
Elvis Pranskevichuse2b340a2018-05-29 17:31:01 -0400467 """Internal helper for wait().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700468
469 The fs argument must be a collection of Futures.
470 """
471 assert fs, 'Set of Futures is empty.'
Yury Selivanov7661db62016-05-16 15:38:39 -0400472 waiter = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700473 timeout_handle = None
474 if timeout is not None:
475 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
476 counter = len(fs)
477
478 def _on_completion(f):
479 nonlocal counter
480 counter -= 1
481 if (counter <= 0 or
482 return_when == FIRST_COMPLETED or
483 return_when == FIRST_EXCEPTION and (not f.cancelled() and
484 f.exception() is not None)):
485 if timeout_handle is not None:
486 timeout_handle.cancel()
487 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200488 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700489
490 for f in fs:
491 f.add_done_callback(_on_completion)
492
493 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200494 await waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700495 finally:
496 if timeout_handle is not None:
497 timeout_handle.cancel()
gescheitc1964e92019-05-03 18:18:02 +0300498 for f in fs:
499 f.remove_done_callback(_on_completion)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700500
501 done, pending = set(), set()
502 for f in fs:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700503 if f.done():
504 done.add(f)
505 else:
506 pending.add(f)
507 return done, pending
508
509
Elvis Pranskevichuse2b340a2018-05-29 17:31:01 -0400510async def _cancel_and_wait(fut, loop):
511 """Cancel the *fut* future or task and wait until it completes."""
512
513 waiter = loop.create_future()
514 cb = functools.partial(_release_waiter, waiter)
515 fut.add_done_callback(cb)
516
517 try:
518 fut.cancel()
519 # We cannot wait on *fut* directly to make
520 # sure _cancel_and_wait itself is reliably cancellable.
521 await waiter
522 finally:
523 fut.remove_done_callback(cb)
524
525
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700526# This is *not* a @coroutine! It is just an iterator (yielding Futures).
527def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800528 """Return an iterator whose values are coroutines.
529
530 When waiting for the yielded coroutines you'll get the results (or
531 exceptions!) of the original Futures (or coroutines), in the order
532 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700533
534 This differs from PEP 3148; the proper way to use this is:
535
536 for f in as_completed(fs):
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200537 result = await f # The 'await' may raise.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700538 # Use result.
539
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200540 If a timeout is specified, the 'await' will raise
Guido van Rossumb58f0532014-02-12 17:58:19 -0800541 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700542
543 Note: The futures 'f' are not necessarily members of fs.
544 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700545 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500546 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700547 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400548 todo = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800549 from .queues import Queue # Import here to avoid circular import problem.
550 done = Queue(loop=loop)
551 timeout_handle = None
552
553 def _on_timeout():
554 for f in todo:
555 f.remove_done_callback(_on_completion)
556 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
557 todo.clear() # Can't do todo.remove(f) in the loop.
558
559 def _on_completion(f):
560 if not todo:
561 return # _on_timeout() was here first.
562 todo.remove(f)
563 done.put_nowait(f)
564 if not todo and timeout_handle is not None:
565 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700566
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200567 async def _wait_for_one():
568 f = await done.get()
Guido van Rossumb58f0532014-02-12 17:58:19 -0800569 if f is None:
570 # Dummy value from _on_timeout().
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700571 raise exceptions.TimeoutError
Guido van Rossumb58f0532014-02-12 17:58:19 -0800572 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700573
Guido van Rossumb58f0532014-02-12 17:58:19 -0800574 for f in todo:
575 f.add_done_callback(_on_completion)
576 if todo and timeout is not None:
577 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700578 for _ in range(len(todo)):
579 yield _wait_for_one()
580
581
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200582@types.coroutine
583def __sleep0():
584 """Skip one event loop run cycle.
585
586 This is a private helper for 'asyncio.sleep()', used
587 when the 'delay' is set to 0. It uses a bare 'yield'
Yury Selivanov22feeb82018-01-24 11:31:01 -0500588 expression (which Task.__step knows how to handle)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200589 instead of creating a Future object.
590 """
591 yield
592
593
594async def sleep(delay, result=None, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700595 """Coroutine that completes after a given time (in seconds)."""
Andrew Svetlov5382c052017-12-17 16:41:30 +0200596 if delay <= 0:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200597 await __sleep0()
Yury Selivanovade04122015-11-05 14:29:04 -0500598 return result
599
Yury Selivanov7661db62016-05-16 15:38:39 -0400600 if loop is None:
João Júnior558c49b2018-09-24 06:51:22 -0300601 loop = events.get_running_loop()
602 else:
Matthias Bussonnierd0ebf132019-05-20 23:20:10 -0700603 warnings.warn("The loop argument is deprecated since Python 3.8, "
604 "and scheduled for removal in Python 3.10.",
João Júnior558c49b2018-09-24 06:51:22 -0300605 DeprecationWarning, stacklevel=2)
606
Yury Selivanov7661db62016-05-16 15:38:39 -0400607 future = loop.create_future()
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500608 h = loop.call_later(delay,
609 futures._set_result_unless_cancelled,
610 future, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700611 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200612 return await future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700613 finally:
614 h.cancel()
615
616
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400617def ensure_future(coro_or_future, *, loop=None):
Yury Selivanov620279b2015-10-02 15:00:19 -0400618 """Wrap a coroutine or an awaitable in a future.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400619
620 If the argument is a Future, it is returned directly.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700621 """
jimmylaie549c4b2018-05-28 06:42:05 -1000622 if coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200623 if loop is None:
624 loop = events.get_event_loop()
625 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200626 if task._source_traceback:
627 del task._source_traceback[-1]
628 return task
jimmylaie549c4b2018-05-28 06:42:05 -1000629 elif futures.isfuture(coro_or_future):
630 if loop is not None and loop is not futures._get_loop(coro_or_future):
Zackery Spytz4737b922019-05-03 09:35:26 -0600631 raise ValueError('The future belongs to a different loop than '
632 'the one specified as the loop argument')
jimmylaie549c4b2018-05-28 06:42:05 -1000633 return coro_or_future
Victor Stinner3f438a92017-11-28 14:43:52 +0100634 elif inspect.isawaitable(coro_or_future):
Yury Selivanov620279b2015-10-02 15:00:19 -0400635 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700636 else:
Charles Renwickae5b3262017-04-21 16:49:48 -0400637 raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
638 'required')
Yury Selivanov620279b2015-10-02 15:00:19 -0400639
640
Andrew Svetlov68b34a72019-05-16 17:52:10 +0300641@types.coroutine
Yury Selivanov620279b2015-10-02 15:00:19 -0400642def _wrap_awaitable(awaitable):
643 """Helper for asyncio.ensure_future().
644
645 Wraps awaitable (an object with __await__) into a coroutine
646 that will later be wrapped in a Task by ensure_future().
647 """
648 return (yield from awaitable.__await__())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700649
Andrew Svetlov68b34a72019-05-16 17:52:10 +0300650_wrap_awaitable._is_coroutine = _is_coroutine
651
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700652
653class _GatheringFuture(futures.Future):
654 """Helper for gather().
655
656 This overrides cancel() to cancel all the children and act more
657 like Task.cancel(), which doesn't immediately mark itself as
658 cancelled.
659 """
660
661 def __init__(self, children, *, loop=None):
662 super().__init__(loop=loop)
663 self._children = children
Yury Selivanov863b6742018-05-29 17:20:02 -0400664 self._cancel_requested = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700665
666 def cancel(self):
667 if self.done():
668 return False
Yury Selivanov3d676152016-10-21 17:22:17 -0400669 ret = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700670 for child in self._children:
Yury Selivanov3d676152016-10-21 17:22:17 -0400671 if child.cancel():
672 ret = True
Yury Selivanov863b6742018-05-29 17:20:02 -0400673 if ret:
674 # If any child tasks were actually cancelled, we should
675 # propagate the cancellation request regardless of
676 # *return_exceptions* argument. See issue 32684.
677 self._cancel_requested = True
Yury Selivanov3d676152016-10-21 17:22:17 -0400678 return ret
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700679
680
681def gather(*coros_or_futures, loop=None, return_exceptions=False):
Yury Selivanov36c2c042017-12-19 07:19:53 -0500682 """Return a future aggregating results from the given coroutines/futures.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700683
Guido van Rossume3c65a72016-09-30 08:17:15 -0700684 Coroutines will be wrapped in a future and scheduled in the event
685 loop. They will not necessarily be scheduled in the same order as
686 passed in.
687
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700688 All futures must share the same event loop. If all the tasks are
689 done successfully, the returned future's result is the list of
690 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500691 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700692 exceptions in the tasks are treated the same as successful
693 results, and gathered in the result list; otherwise, the first
694 raised exception will be immediately propagated to the returned
695 future.
696
697 Cancellation: if the outer Future is cancelled, all children (that
698 have not completed yet) are also cancelled. If any child is
699 cancelled, this is treated as if it raised CancelledError --
700 the outer Future is *not* cancelled in this case. (This is to
701 prevent the cancellation of one child to cause other children to
702 be cancelled.)
703 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200704 if not coros_or_futures:
Yury Selivanov7661db62016-05-16 15:38:39 -0400705 if loop is None:
706 loop = events.get_event_loop()
707 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700708 outer.set_result([])
709 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200710
Yury Selivanov36c2c042017-12-19 07:19:53 -0500711 def _done_callback(fut):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700712 nonlocal nfinished
Yury Selivanov36c2c042017-12-19 07:19:53 -0500713 nfinished += 1
714
Victor Stinner3531d902015-01-09 01:42:52 +0100715 if outer.done():
716 if not fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700717 # Mark exception retrieved.
718 fut.exception()
719 return
Victor Stinner3531d902015-01-09 01:42:52 +0100720
Yury Selivanov36c2c042017-12-19 07:19:53 -0500721 if not return_exceptions:
722 if fut.cancelled():
723 # Check if 'fut' is cancelled first, as
724 # 'fut.exception()' will *raise* a CancelledError
725 # instead of returning it.
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700726 exc = exceptions.CancelledError()
Yury Selivanov36c2c042017-12-19 07:19:53 -0500727 outer.set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700728 return
Yury Selivanov36c2c042017-12-19 07:19:53 -0500729 else:
730 exc = fut.exception()
731 if exc is not None:
732 outer.set_exception(exc)
733 return
734
735 if nfinished == nfuts:
736 # All futures are done; create a list of results
737 # and set it to the 'outer' future.
738 results = []
739
740 for fut in children:
741 if fut.cancelled():
742 # Check if 'fut' is cancelled first, as
743 # 'fut.exception()' will *raise* a CancelledError
744 # instead of returning it.
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700745 res = exceptions.CancelledError()
Yury Selivanov36c2c042017-12-19 07:19:53 -0500746 else:
747 res = fut.exception()
748 if res is None:
749 res = fut.result()
750 results.append(res)
751
Yury Selivanov863b6742018-05-29 17:20:02 -0400752 if outer._cancel_requested:
753 # If gather is being cancelled we must propagate the
754 # cancellation regardless of *return_exceptions* argument.
755 # See issue 32684.
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700756 outer.set_exception(exceptions.CancelledError())
Yury Selivanov863b6742018-05-29 17:20:02 -0400757 else:
758 outer.set_result(results)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700759
Yury Selivanov36c2c042017-12-19 07:19:53 -0500760 arg_to_fut = {}
761 children = []
762 nfuts = 0
763 nfinished = 0
764 for arg in coros_or_futures:
765 if arg not in arg_to_fut:
766 fut = ensure_future(arg, loop=loop)
767 if loop is None:
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500768 loop = futures._get_loop(fut)
Yury Selivanov36c2c042017-12-19 07:19:53 -0500769 if fut is not arg:
770 # 'arg' was not a Future, therefore, 'fut' is a new
771 # Future created specifically for 'arg'. Since the caller
772 # can't control it, disable the "destroy pending task"
773 # warning.
774 fut._log_destroy_pending = False
775
776 nfuts += 1
777 arg_to_fut[arg] = fut
778 fut.add_done_callback(_done_callback)
779
780 else:
781 # There's a duplicate Future object in coros_or_futures.
782 fut = arg_to_fut[arg]
783
784 children.append(fut)
785
786 outer = _GatheringFuture(children, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700787 return outer
788
789
790def shield(arg, *, loop=None):
791 """Wait for a future, shielding it from cancellation.
792
793 The statement
794
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200795 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700796
797 is exactly equivalent to the statement
798
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200799 res = await something()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700800
801 *except* that if the coroutine containing it is cancelled, the
802 task running in something() is not cancelled. From the POV of
803 something(), the cancellation did not happen. But its caller is
804 still cancelled, so the yield-from expression still raises
805 CancelledError. Note: If something() is cancelled by other means
806 this will still cancel shield().
807
808 If you want to completely ignore cancellation (not recommended)
809 you can combine shield() with a try/except clause, as follows:
810
811 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200812 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700813 except CancelledError:
814 res = None
815 """
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400816 inner = ensure_future(arg, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700817 if inner.done():
818 # Shortcut.
819 return inner
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500820 loop = futures._get_loop(inner)
Yury Selivanov7661db62016-05-16 15:38:39 -0400821 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700822
Romain Picardb35acc52019-05-07 20:58:24 +0200823 def _inner_done_callback(inner):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700824 if outer.cancelled():
Victor Stinner3531d902015-01-09 01:42:52 +0100825 if not inner.cancelled():
826 # Mark inner's result as retrieved.
827 inner.exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700828 return
Victor Stinner3531d902015-01-09 01:42:52 +0100829
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700830 if inner.cancelled():
831 outer.cancel()
832 else:
833 exc = inner.exception()
834 if exc is not None:
835 outer.set_exception(exc)
836 else:
837 outer.set_result(inner.result())
838
Romain Picardb35acc52019-05-07 20:58:24 +0200839
840 def _outer_done_callback(outer):
841 if not inner.done():
842 inner.remove_done_callback(_inner_done_callback)
843
844 inner.add_done_callback(_inner_done_callback)
845 outer.add_done_callback(_outer_done_callback)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700846 return outer
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700847
848
849def run_coroutine_threadsafe(coro, loop):
850 """Submit a coroutine object to a given event loop.
851
852 Return a concurrent.futures.Future to access the result.
853 """
854 if not coroutines.iscoroutine(coro):
855 raise TypeError('A coroutine object is required')
856 future = concurrent.futures.Future()
857
858 def callback():
Guido van Rossum601953b2015-10-05 16:20:00 -0700859 try:
860 futures._chain_future(ensure_future(coro, loop=loop), future)
Yury Selivanov431b5402019-05-27 14:45:12 +0200861 except (SystemExit, KeyboardInterrupt):
862 raise
863 except BaseException as exc:
Guido van Rossum601953b2015-10-05 16:20:00 -0700864 if future.set_running_or_notify_cancel():
865 future.set_exception(exc)
866 raise
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700867
868 loop.call_soon_threadsafe(callback)
869 return future
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200870
871
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500872# WeakSet containing all alive tasks.
873_all_tasks = weakref.WeakSet()
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200874
875# Dictionary containing tasks that are currently active in
876# all running event loops. {EventLoop: Task}
877_current_tasks = {}
878
879
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500880def _register_task(task):
881 """Register a new task in asyncio as executed by loop."""
882 _all_tasks.add(task)
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200883
884
885def _enter_task(loop, task):
886 current_task = _current_tasks.get(loop)
887 if current_task is not None:
888 raise RuntimeError(f"Cannot enter into task {task!r} while another "
889 f"task {current_task!r} is being executed.")
890 _current_tasks[loop] = task
891
892
893def _leave_task(loop, task):
894 current_task = _current_tasks.get(loop)
895 if current_task is not task:
896 raise RuntimeError(f"Leaving task {task!r} does not match "
897 f"the current task {current_task!r}.")
898 del _current_tasks[loop]
899
900
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500901def _unregister_task(task):
902 """Unregister a task."""
903 _all_tasks.discard(task)
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200904
905
906_py_register_task = _register_task
907_py_unregister_task = _unregister_task
908_py_enter_task = _enter_task
909_py_leave_task = _leave_task
910
911
912try:
913 from _asyncio import (_register_task, _unregister_task,
914 _enter_task, _leave_task,
915 _all_tasks, _current_tasks)
916except ImportError:
917 pass
918else:
919 _c_register_task = _register_task
920 _c_unregister_task = _unregister_task
921 _c_enter_task = _enter_task
922 _c_leave_task = _leave_task