blob: 95e85600a2e8025e858821379e92a639da6e2c34 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Yury Selivanov6370f342017-12-10 18:36:12 -05003__all__ = (
Andrew Svetlovf74ef452017-12-15 07:04:38 +02004 'Task', 'create_task',
Yury Selivanov6370f342017-12-10 18:36:12 -05005 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
Yury Selivanov9edad3c2017-12-11 10:03:48 -05006 'wait', 'wait_for', 'as_completed', 'sleep',
Yury Selivanov6370f342017-12-10 18:36:12 -05007 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
Andrew Svetlov44d1a592017-12-16 21:58:38 +02008 'current_task', 'all_tasks',
9 '_register_task', '_unregister_task', '_enter_task', '_leave_task',
Yury Selivanov6370f342017-12-10 18:36:12 -050010)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012import concurrent.futures
Yury Selivanovf23746a2018-01-22 19:11:18 -050013import contextvars
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014import functools
15import inspect
Alex Grönholmcca4eec2018-08-09 00:06:47 +030016import itertools
Andrew Svetlov5f841b52017-12-09 00:23:48 +020017import types
Yury Selivanov59eb9a42015-05-11 14:48:38 -040018import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019import weakref
20
Yury Selivanova0c1ba62016-10-28 12:52:37 -040021from . import base_tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020022from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070024from . import exceptions
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025from . import futures
Andrew Svetlov68b34a72019-05-16 17:52:10 +030026from .coroutines import _is_coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027
Alex Grönholmcca4eec2018-08-09 00:06:47 +030028# Helper to generate new task names
29# This uses itertools.count() instead of a "+= 1" operation because the latter
30# is not thread safe. See bpo-11866 for a longer explanation.
31_task_name_counter = itertools.count(1).__next__
32
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070033
Andrew Svetlov44d1a592017-12-16 21:58:38 +020034def current_task(loop=None):
35 """Return a currently executed task."""
36 if loop is None:
37 loop = events.get_running_loop()
38 return _current_tasks.get(loop)
39
40
41def all_tasks(loop=None):
42 """Return a set of all tasks for the loop."""
43 if loop is None:
Yury Selivanov416c1eb2018-05-28 17:54:02 -040044 loop = events.get_running_loop()
Andrew Svetlov97cf0822018-10-13 21:12:40 +030045 # NB: set(_all_tasks) is required to protect
46 # from https://bugs.python.org/issue34970 bug
47 return {t for t in list(_all_tasks)
Yury Selivanov416c1eb2018-05-28 17:54:02 -040048 if futures._get_loop(t) is loop and not t.done()}
49
50
51def _all_tasks_compat(loop=None):
52 # Different from "all_task()" by returning *all* Tasks, including
53 # the completed ones. Used to implement deprecated "Tasks.all_task()"
54 # method.
55 if loop is None:
Andrew Svetlov44d1a592017-12-16 21:58:38 +020056 loop = events.get_event_loop()
Andrew Svetlov97cf0822018-10-13 21:12:40 +030057 # NB: set(_all_tasks) is required to protect
58 # from https://bugs.python.org/issue34970 bug
59 return {t for t in list(_all_tasks) if futures._get_loop(t) is loop}
Andrew Svetlov44d1a592017-12-16 21:58:38 +020060
61
Alex Grönholmcca4eec2018-08-09 00:06:47 +030062def _set_task_name(task, name):
63 if name is not None:
64 try:
65 set_name = task.set_name
66 except AttributeError:
67 pass
68 else:
69 set_name(name)
70
71
Yury Selivanov0cf16f92017-12-25 10:48:15 -050072class Task(futures._PyFuture): # Inherit Python Task implementation
73 # from a Python Future implementation.
74
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070075 """A coroutine wrapped in a Future."""
76
77 # An important invariant maintained while a Task not done:
78 #
79 # - Either _fut_waiter is None, and _step() is scheduled;
80 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
81 #
82 # The only transition from the latter to the former is through
83 # _wakeup(). When _fut_waiter is not None, one of its callbacks
84 # must be _wakeup().
85
Victor Stinnerfe22e092014-12-04 23:00:13 +010086 # If False, don't log a message if the task is destroyed whereas its
87 # status is still pending
88 _log_destroy_pending = True
89
Guido van Rossum1a605ed2013-12-06 12:57:40 -080090 @classmethod
91 def current_task(cls, loop=None):
92 """Return the currently running task in an event loop or None.
93
94 By default the current task for the current event loop is returned.
95
96 None is returned when called not in the context of a Task.
97 """
Matthias Bussonnierd0ebf132019-05-20 23:20:10 -070098 warnings.warn("Task.current_task() is deprecated since Python 3.7, "
Andrew Svetlov44d1a592017-12-16 21:58:38 +020099 "use asyncio.current_task() instead",
Inada Naokic5c6cda2019-03-22 20:07:32 +0900100 DeprecationWarning,
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200101 stacklevel=2)
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800102 if loop is None:
103 loop = events.get_event_loop()
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200104 return current_task(loop)
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800105
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700106 @classmethod
107 def all_tasks(cls, loop=None):
108 """Return a set of all tasks for an event loop.
109
110 By default all tasks for the current event loop are returned.
111 """
Matthias Bussonnierd0ebf132019-05-20 23:20:10 -0700112 warnings.warn("Task.all_tasks() is deprecated since Python 3.7, "
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200113 "use asyncio.all_tasks() instead",
Inada Naokic5c6cda2019-03-22 20:07:32 +0900114 DeprecationWarning,
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200115 stacklevel=2)
Yury Selivanov416c1eb2018-05-28 17:54:02 -0400116 return _all_tasks_compat(loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700117
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300118 def __init__(self, coro, *, loop=None, name=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700119 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200120 if self._source_traceback:
121 del self._source_traceback[-1]
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200122 if not coroutines.iscoroutine(coro):
123 # raise after Future.__init__(), attrs are required for __del__
124 # prevent logging for pending task in __del__
125 self._log_destroy_pending = False
126 raise TypeError(f"a coroutine was expected, got {coro!r}")
127
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300128 if name is None:
129 self._name = f'Task-{_task_name_counter()}'
130 else:
131 self._name = str(name)
132
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700133 self._must_cancel = False
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200134 self._fut_waiter = None
135 self._coro = coro
Yury Selivanovf23746a2018-01-22 19:11:18 -0500136 self._context = contextvars.copy_context()
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200137
Yury Selivanov22feeb82018-01-24 11:31:01 -0500138 self._loop.call_soon(self.__step, context=self._context)
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500139 _register_task(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700140
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900141 def __del__(self):
142 if self._state == futures._PENDING and self._log_destroy_pending:
143 context = {
144 'task': self,
145 'message': 'Task was destroyed but it is pending!',
146 }
147 if self._source_traceback:
148 context['source_traceback'] = self._source_traceback
149 self._loop.call_exception_handler(context)
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500150 super().__del__()
Victor Stinnera02f81f2014-06-24 22:37:53 +0200151
Victor Stinner313a9802014-07-29 12:58:23 +0200152 def _repr_info(self):
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400153 return base_tasks._task_repr_info(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700154
Alex Grönholm98ef9202019-05-30 18:30:09 +0300155 def get_coro(self):
156 return self._coro
157
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300158 def get_name(self):
159 return self._name
160
161 def set_name(self, value):
162 self._name = str(value)
163
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500164 def set_result(self, result):
165 raise RuntimeError('Task does not support set_result operation')
166
167 def set_exception(self, exception):
168 raise RuntimeError('Task does not support set_exception operation')
169
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700170 def get_stack(self, *, limit=None):
171 """Return the list of stack frames for this task's coroutine.
172
Victor Stinnerd87de832014-12-02 17:57:04 +0100173 If the coroutine is not done, this returns the stack where it is
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700174 suspended. If the coroutine has completed successfully or was
175 cancelled, this returns an empty list. If the coroutine was
176 terminated by an exception, this returns the list of traceback
177 frames.
178
179 The frames are always ordered from oldest to newest.
180
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500181 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700182 return; by default all available frames are returned. Its
183 meaning differs depending on whether a stack or a traceback is
184 returned: the newest frames of a stack are returned, but the
185 oldest frames of a traceback are returned. (This matches the
186 behavior of the traceback module.)
187
188 For reasons beyond our control, only one stack frame is
189 returned for a suspended coroutine.
190 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400191 return base_tasks._task_get_stack(self, limit)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700192
193 def print_stack(self, *, limit=None, file=None):
194 """Print the stack or traceback for this task's coroutine.
195
196 This produces output similar to that of the traceback module,
197 for the frames retrieved by get_stack(). The limit argument
198 is passed to get_stack(). The file argument is an I/O stream
R David Murray8e069d52014-09-24 13:13:45 -0400199 to which the output is written; by default output is written
200 to sys.stderr.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700201 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400202 return base_tasks._task_print_stack(self, limit, file)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700203
204 def cancel(self):
R David Murray8e069d52014-09-24 13:13:45 -0400205 """Request that this task cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200206
Victor Stinner8d213572014-06-02 23:06:46 +0200207 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200208 wrapped coroutine on the next cycle through the event loop.
209 The coroutine then has a chance to clean up or even deny
210 the request using try/except/finally.
211
R David Murray8e069d52014-09-24 13:13:45 -0400212 Unlike Future.cancel, this does not guarantee that the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200213 task will be cancelled: the exception might be caught and
R David Murray8e069d52014-09-24 13:13:45 -0400214 acted upon, delaying cancellation of the task or preventing
215 cancellation completely. The task may also return a value or
216 raise a different exception.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200217
218 Immediately after this method is called, Task.cancelled() will
219 not return True (unless the task was already cancelled). A
220 task will be marked as cancelled when the wrapped coroutine
221 terminates with a CancelledError exception (even if cancel()
222 was not called).
223 """
Yury Selivanov7ce1c6f2017-06-11 13:49:18 +0000224 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700225 if self.done():
226 return False
227 if self._fut_waiter is not None:
228 if self._fut_waiter.cancel():
229 # Leave self._fut_waiter; it may be a Task that
230 # catches and ignores the cancellation so we may have
231 # to cancel it again later.
232 return True
Yury Selivanov22feeb82018-01-24 11:31:01 -0500233 # It must be the case that self.__step is already scheduled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700234 self._must_cancel = True
235 return True
236
Yury Selivanov22feeb82018-01-24 11:31:01 -0500237 def __step(self, exc=None):
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500238 if self.done():
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700239 raise exceptions.InvalidStateError(
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500240 f'_step(): already done: {self!r}, {exc!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700241 if self._must_cancel:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700242 if not isinstance(exc, exceptions.CancelledError):
243 exc = exceptions.CancelledError()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700244 self._must_cancel = False
245 coro = self._coro
246 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800247
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200248 _enter_task(self._loop, self)
Yury Selivanovd59bba82015-11-20 12:41:03 -0500249 # Call either coro.throw(exc) or coro.send(None).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700250 try:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500251 if exc is None:
252 # We use the `send` method directly, because coroutines
253 # don't have `__iter__` and `__next__` methods.
254 result = coro.send(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700255 else:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500256 result = coro.throw(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700257 except StopIteration as exc:
INADA Naoki991adca2017-05-11 21:18:38 +0900258 if self._must_cancel:
259 # Task is cancelled right before coro stops.
260 self._must_cancel = False
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700261 super().set_exception(exceptions.CancelledError())
INADA Naoki991adca2017-05-11 21:18:38 +0900262 else:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500263 super().set_result(exc.value)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700264 except exceptions.CancelledError:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700265 super().cancel() # I.e., Future.cancel(self).
Yury Selivanov431b5402019-05-27 14:45:12 +0200266 except (KeyboardInterrupt, SystemExit) as exc:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500267 super().set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700268 raise
Yury Selivanov431b5402019-05-27 14:45:12 +0200269 except BaseException as exc:
270 super().set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700271 else:
Guido van Rossum1140a032016-09-09 12:54:54 -0700272 blocking = getattr(result, '_asyncio_future_blocking', None)
273 if blocking is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700274 # Yielded Future must come from Future.__iter__().
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500275 if futures._get_loop(result) is not self._loop:
Yury Selivanov6370f342017-12-10 18:36:12 -0500276 new_exc = RuntimeError(
277 f'Task {self!r} got Future '
278 f'{result!r} attached to a different loop')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500279 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500280 self.__step, new_exc, context=self._context)
Guido van Rossum1140a032016-09-09 12:54:54 -0700281 elif blocking:
Yury Selivanov4145c832016-10-09 12:19:12 -0400282 if result is self:
Yury Selivanov6370f342017-12-10 18:36:12 -0500283 new_exc = RuntimeError(
284 f'Task cannot await on itself: {self!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500285 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500286 self.__step, new_exc, context=self._context)
Yury Selivanov4145c832016-10-09 12:19:12 -0400287 else:
288 result._asyncio_future_blocking = False
Yury Selivanovf23746a2018-01-22 19:11:18 -0500289 result.add_done_callback(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500290 self.__wakeup, context=self._context)
Yury Selivanov4145c832016-10-09 12:19:12 -0400291 self._fut_waiter = result
292 if self._must_cancel:
293 if self._fut_waiter.cancel():
294 self._must_cancel = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700295 else:
Yury Selivanov6370f342017-12-10 18:36:12 -0500296 new_exc = RuntimeError(
297 f'yield was used instead of yield from '
298 f'in task {self!r} with {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500299 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500300 self.__step, new_exc, context=self._context)
Yury Selivanov6370f342017-12-10 18:36:12 -0500301
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700302 elif result is None:
303 # Bare yield relinquishes control for one event loop iteration.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500304 self._loop.call_soon(self.__step, context=self._context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700305 elif inspect.isgenerator(result):
306 # Yielding a generator is just wrong.
Yury Selivanov6370f342017-12-10 18:36:12 -0500307 new_exc = RuntimeError(
308 f'yield was used instead of yield from for '
Serhiy Storchaka66553542018-05-20 16:30:31 +0300309 f'generator in task {self!r} with {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500310 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500311 self.__step, new_exc, context=self._context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700312 else:
313 # Yielding something else is an error.
Yury Selivanov6370f342017-12-10 18:36:12 -0500314 new_exc = RuntimeError(f'Task got bad yield: {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500315 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500316 self.__step, new_exc, context=self._context)
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800317 finally:
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200318 _leave_task(self._loop, self)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100319 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700320
Yury Selivanov22feeb82018-01-24 11:31:01 -0500321 def __wakeup(self, future):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700322 try:
Yury Selivanova4afc482015-11-16 15:12:10 -0500323 future.result()
Yury Selivanov431b5402019-05-27 14:45:12 +0200324 except BaseException as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700325 # This may also be a cancellation.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500326 self.__step(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700327 else:
Yury Selivanova4afc482015-11-16 15:12:10 -0500328 # Don't pass the value of `future.result()` explicitly,
329 # as `Future.__iter__` and `Future.__await__` don't need it.
330 # If we call `_step(value, None)` instead of `_step()`,
331 # Python eval loop would use `.send(value)` method call,
332 # instead of `__next__()`, which is slower for futures
333 # that return non-generator iterators from their `__iter__`.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500334 self.__step()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700335 self = None # Needed to break cycles when an exception occurs.
336
337
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400338_PyTask = Task
339
340
341try:
342 import _asyncio
343except ImportError:
344 pass
345else:
346 # _CTask is needed for tests.
347 Task = _CTask = _asyncio.Task
348
349
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300350def create_task(coro, *, name=None):
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200351 """Schedule the execution of a coroutine object in a spawn task.
352
353 Return a Task object.
354 """
355 loop = events.get_running_loop()
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300356 task = loop.create_task(coro)
357 _set_task_name(task, name)
358 return task
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200359
360
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700361# wait() and as_completed() similar to those in PEP 3148.
362
363FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
364FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
365ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
366
367
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200368async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700369 """Wait for the Futures and coroutines given by fs to complete.
370
Victor Stinnerdb74d982014-06-10 11:16:05 +0200371 The sequence futures must not be empty.
372
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700373 Coroutines will be wrapped in Tasks.
374
375 Returns two sets of Future: (done, pending).
376
377 Usage:
378
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200379 done, pending = await asyncio.wait(fs)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700380
381 Note: This does not raise TimeoutError! Futures that aren't done
382 when the timeout occurs are returned in the second set.
383 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700384 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500385 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700386 if not fs:
387 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200388 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
Yury Selivanov6370f342017-12-10 18:36:12 -0500389 raise ValueError(f'Invalid return_when value: {return_when}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700390
391 if loop is None:
João Júnior558c49b2018-09-24 06:51:22 -0300392 loop = events.get_running_loop()
393 else:
Matthias Bussonnierd0ebf132019-05-20 23:20:10 -0700394 warnings.warn("The loop argument is deprecated since Python 3.8, "
395 "and scheduled for removal in Python 3.10.",
João Júnior558c49b2018-09-24 06:51:22 -0300396 DeprecationWarning, stacklevel=2)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700397
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400398 fs = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700399
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200400 return await _wait(fs, timeout, return_when, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700401
402
Victor Stinner59e08022014-08-28 11:19:25 +0200403def _release_waiter(waiter, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700404 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200405 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700406
407
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200408async def wait_for(fut, timeout, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700409 """Wait for the single Future or coroutine to complete, with timeout.
410
411 Coroutine will be wrapped in Task.
412
Victor Stinner421e49b2014-01-23 17:40:59 +0100413 Returns result of the Future or coroutine. When a timeout occurs,
414 it cancels the task and raises TimeoutError. To avoid the task
415 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700416
Victor Stinner922bc2c2015-01-15 16:29:10 +0100417 If the wait is cancelled, the task is also cancelled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700418
Victor Stinner922bc2c2015-01-15 16:29:10 +0100419 This function is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700420 """
421 if loop is None:
João Júnior558c49b2018-09-24 06:51:22 -0300422 loop = events.get_running_loop()
423 else:
Matthias Bussonnierd0ebf132019-05-20 23:20:10 -0700424 warnings.warn("The loop argument is deprecated since Python 3.8, "
425 "and scheduled for removal in Python 3.10.",
João Júnior558c49b2018-09-24 06:51:22 -0300426 DeprecationWarning, stacklevel=2)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700427
Guido van Rossum48c66c32014-01-29 14:30:38 -0800428 if timeout is None:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200429 return await fut
Guido van Rossum48c66c32014-01-29 14:30:38 -0800430
Victor K4d071892017-10-05 19:04:39 +0300431 if timeout <= 0:
432 fut = ensure_future(fut, loop=loop)
433
434 if fut.done():
435 return fut.result()
436
437 fut.cancel()
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700438 raise exceptions.TimeoutError()
Victor K4d071892017-10-05 19:04:39 +0300439
Yury Selivanov7661db62016-05-16 15:38:39 -0400440 waiter = loop.create_future()
Victor Stinner59e08022014-08-28 11:19:25 +0200441 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
442 cb = functools.partial(_release_waiter, waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700443
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400444 fut = ensure_future(fut, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700445 fut.add_done_callback(cb)
446
447 try:
Victor Stinner59e08022014-08-28 11:19:25 +0200448 # wait until the future completes or the timeout
Victor Stinner922bc2c2015-01-15 16:29:10 +0100449 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200450 await waiter
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700451 except exceptions.CancelledError:
Victor Stinner922bc2c2015-01-15 16:29:10 +0100452 fut.remove_done_callback(cb)
453 fut.cancel()
454 raise
Victor Stinner59e08022014-08-28 11:19:25 +0200455
456 if fut.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700457 return fut.result()
458 else:
459 fut.remove_done_callback(cb)
Elvis Pranskevichuse2b340a2018-05-29 17:31:01 -0400460 # We must ensure that the task is not running
461 # after wait_for() returns.
462 # See https://bugs.python.org/issue32751
463 await _cancel_and_wait(fut, loop=loop)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700464 raise exceptions.TimeoutError()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700465 finally:
466 timeout_handle.cancel()
467
468
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200469async def _wait(fs, timeout, return_when, loop):
Elvis Pranskevichuse2b340a2018-05-29 17:31:01 -0400470 """Internal helper for wait().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700471
472 The fs argument must be a collection of Futures.
473 """
474 assert fs, 'Set of Futures is empty.'
Yury Selivanov7661db62016-05-16 15:38:39 -0400475 waiter = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700476 timeout_handle = None
477 if timeout is not None:
478 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
479 counter = len(fs)
480
481 def _on_completion(f):
482 nonlocal counter
483 counter -= 1
484 if (counter <= 0 or
485 return_when == FIRST_COMPLETED or
486 return_when == FIRST_EXCEPTION and (not f.cancelled() and
487 f.exception() is not None)):
488 if timeout_handle is not None:
489 timeout_handle.cancel()
490 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200491 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700492
493 for f in fs:
494 f.add_done_callback(_on_completion)
495
496 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200497 await waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700498 finally:
499 if timeout_handle is not None:
500 timeout_handle.cancel()
gescheitc1964e92019-05-03 18:18:02 +0300501 for f in fs:
502 f.remove_done_callback(_on_completion)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700503
504 done, pending = set(), set()
505 for f in fs:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700506 if f.done():
507 done.add(f)
508 else:
509 pending.add(f)
510 return done, pending
511
512
Elvis Pranskevichuse2b340a2018-05-29 17:31:01 -0400513async def _cancel_and_wait(fut, loop):
514 """Cancel the *fut* future or task and wait until it completes."""
515
516 waiter = loop.create_future()
517 cb = functools.partial(_release_waiter, waiter)
518 fut.add_done_callback(cb)
519
520 try:
521 fut.cancel()
522 # We cannot wait on *fut* directly to make
523 # sure _cancel_and_wait itself is reliably cancellable.
524 await waiter
525 finally:
526 fut.remove_done_callback(cb)
527
528
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700529# This is *not* a @coroutine! It is just an iterator (yielding Futures).
530def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800531 """Return an iterator whose values are coroutines.
532
533 When waiting for the yielded coroutines you'll get the results (or
534 exceptions!) of the original Futures (or coroutines), in the order
535 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700536
537 This differs from PEP 3148; the proper way to use this is:
538
539 for f in as_completed(fs):
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200540 result = await f # The 'await' may raise.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700541 # Use result.
542
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200543 If a timeout is specified, the 'await' will raise
Guido van Rossumb58f0532014-02-12 17:58:19 -0800544 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700545
546 Note: The futures 'f' are not necessarily members of fs.
547 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700548 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500549 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700550 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400551 todo = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800552 from .queues import Queue # Import here to avoid circular import problem.
553 done = Queue(loop=loop)
554 timeout_handle = None
555
556 def _on_timeout():
557 for f in todo:
558 f.remove_done_callback(_on_completion)
559 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
560 todo.clear() # Can't do todo.remove(f) in the loop.
561
562 def _on_completion(f):
563 if not todo:
564 return # _on_timeout() was here first.
565 todo.remove(f)
566 done.put_nowait(f)
567 if not todo and timeout_handle is not None:
568 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700569
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200570 async def _wait_for_one():
571 f = await done.get()
Guido van Rossumb58f0532014-02-12 17:58:19 -0800572 if f is None:
573 # Dummy value from _on_timeout().
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700574 raise exceptions.TimeoutError
Guido van Rossumb58f0532014-02-12 17:58:19 -0800575 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700576
Guido van Rossumb58f0532014-02-12 17:58:19 -0800577 for f in todo:
578 f.add_done_callback(_on_completion)
579 if todo and timeout is not None:
580 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700581 for _ in range(len(todo)):
582 yield _wait_for_one()
583
584
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200585@types.coroutine
586def __sleep0():
587 """Skip one event loop run cycle.
588
589 This is a private helper for 'asyncio.sleep()', used
590 when the 'delay' is set to 0. It uses a bare 'yield'
Yury Selivanov22feeb82018-01-24 11:31:01 -0500591 expression (which Task.__step knows how to handle)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200592 instead of creating a Future object.
593 """
594 yield
595
596
597async def sleep(delay, result=None, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700598 """Coroutine that completes after a given time (in seconds)."""
Andrew Svetlov5382c052017-12-17 16:41:30 +0200599 if delay <= 0:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200600 await __sleep0()
Yury Selivanovade04122015-11-05 14:29:04 -0500601 return result
602
Yury Selivanov7661db62016-05-16 15:38:39 -0400603 if loop is None:
João Júnior558c49b2018-09-24 06:51:22 -0300604 loop = events.get_running_loop()
605 else:
Matthias Bussonnierd0ebf132019-05-20 23:20:10 -0700606 warnings.warn("The loop argument is deprecated since Python 3.8, "
607 "and scheduled for removal in Python 3.10.",
João Júnior558c49b2018-09-24 06:51:22 -0300608 DeprecationWarning, stacklevel=2)
609
Yury Selivanov7661db62016-05-16 15:38:39 -0400610 future = loop.create_future()
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500611 h = loop.call_later(delay,
612 futures._set_result_unless_cancelled,
613 future, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700614 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200615 return await future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700616 finally:
617 h.cancel()
618
619
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400620def ensure_future(coro_or_future, *, loop=None):
Yury Selivanov620279b2015-10-02 15:00:19 -0400621 """Wrap a coroutine or an awaitable in a future.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400622
623 If the argument is a Future, it is returned directly.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700624 """
jimmylaie549c4b2018-05-28 06:42:05 -1000625 if coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200626 if loop is None:
627 loop = events.get_event_loop()
628 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200629 if task._source_traceback:
630 del task._source_traceback[-1]
631 return task
jimmylaie549c4b2018-05-28 06:42:05 -1000632 elif futures.isfuture(coro_or_future):
633 if loop is not None and loop is not futures._get_loop(coro_or_future):
Zackery Spytz4737b922019-05-03 09:35:26 -0600634 raise ValueError('The future belongs to a different loop than '
635 'the one specified as the loop argument')
jimmylaie549c4b2018-05-28 06:42:05 -1000636 return coro_or_future
Victor Stinner3f438a92017-11-28 14:43:52 +0100637 elif inspect.isawaitable(coro_or_future):
Yury Selivanov620279b2015-10-02 15:00:19 -0400638 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700639 else:
Charles Renwickae5b3262017-04-21 16:49:48 -0400640 raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
641 'required')
Yury Selivanov620279b2015-10-02 15:00:19 -0400642
643
Andrew Svetlov68b34a72019-05-16 17:52:10 +0300644@types.coroutine
Yury Selivanov620279b2015-10-02 15:00:19 -0400645def _wrap_awaitable(awaitable):
646 """Helper for asyncio.ensure_future().
647
648 Wraps awaitable (an object with __await__) into a coroutine
649 that will later be wrapped in a Task by ensure_future().
650 """
651 return (yield from awaitable.__await__())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700652
Andrew Svetlov68b34a72019-05-16 17:52:10 +0300653_wrap_awaitable._is_coroutine = _is_coroutine
654
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700655
656class _GatheringFuture(futures.Future):
657 """Helper for gather().
658
659 This overrides cancel() to cancel all the children and act more
660 like Task.cancel(), which doesn't immediately mark itself as
661 cancelled.
662 """
663
664 def __init__(self, children, *, loop=None):
665 super().__init__(loop=loop)
666 self._children = children
Yury Selivanov863b6742018-05-29 17:20:02 -0400667 self._cancel_requested = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700668
669 def cancel(self):
670 if self.done():
671 return False
Yury Selivanov3d676152016-10-21 17:22:17 -0400672 ret = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700673 for child in self._children:
Yury Selivanov3d676152016-10-21 17:22:17 -0400674 if child.cancel():
675 ret = True
Yury Selivanov863b6742018-05-29 17:20:02 -0400676 if ret:
677 # If any child tasks were actually cancelled, we should
678 # propagate the cancellation request regardless of
679 # *return_exceptions* argument. See issue 32684.
680 self._cancel_requested = True
Yury Selivanov3d676152016-10-21 17:22:17 -0400681 return ret
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700682
683
684def gather(*coros_or_futures, loop=None, return_exceptions=False):
Yury Selivanov36c2c042017-12-19 07:19:53 -0500685 """Return a future aggregating results from the given coroutines/futures.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700686
Guido van Rossume3c65a72016-09-30 08:17:15 -0700687 Coroutines will be wrapped in a future and scheduled in the event
688 loop. They will not necessarily be scheduled in the same order as
689 passed in.
690
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700691 All futures must share the same event loop. If all the tasks are
692 done successfully, the returned future's result is the list of
693 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500694 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700695 exceptions in the tasks are treated the same as successful
696 results, and gathered in the result list; otherwise, the first
697 raised exception will be immediately propagated to the returned
698 future.
699
700 Cancellation: if the outer Future is cancelled, all children (that
701 have not completed yet) are also cancelled. If any child is
702 cancelled, this is treated as if it raised CancelledError --
703 the outer Future is *not* cancelled in this case. (This is to
704 prevent the cancellation of one child to cause other children to
705 be cancelled.)
706 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200707 if not coros_or_futures:
Yury Selivanov7661db62016-05-16 15:38:39 -0400708 if loop is None:
709 loop = events.get_event_loop()
710 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700711 outer.set_result([])
712 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200713
Yury Selivanov36c2c042017-12-19 07:19:53 -0500714 def _done_callback(fut):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700715 nonlocal nfinished
Yury Selivanov36c2c042017-12-19 07:19:53 -0500716 nfinished += 1
717
Victor Stinner3531d902015-01-09 01:42:52 +0100718 if outer.done():
719 if not fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700720 # Mark exception retrieved.
721 fut.exception()
722 return
Victor Stinner3531d902015-01-09 01:42:52 +0100723
Yury Selivanov36c2c042017-12-19 07:19:53 -0500724 if not return_exceptions:
725 if fut.cancelled():
726 # Check if 'fut' is cancelled first, as
727 # 'fut.exception()' will *raise* a CancelledError
728 # instead of returning it.
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700729 exc = exceptions.CancelledError()
Yury Selivanov36c2c042017-12-19 07:19:53 -0500730 outer.set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700731 return
Yury Selivanov36c2c042017-12-19 07:19:53 -0500732 else:
733 exc = fut.exception()
734 if exc is not None:
735 outer.set_exception(exc)
736 return
737
738 if nfinished == nfuts:
739 # All futures are done; create a list of results
740 # and set it to the 'outer' future.
741 results = []
742
743 for fut in children:
744 if fut.cancelled():
745 # Check if 'fut' is cancelled first, as
746 # 'fut.exception()' will *raise* a CancelledError
747 # instead of returning it.
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700748 res = exceptions.CancelledError()
Yury Selivanov36c2c042017-12-19 07:19:53 -0500749 else:
750 res = fut.exception()
751 if res is None:
752 res = fut.result()
753 results.append(res)
754
Yury Selivanov863b6742018-05-29 17:20:02 -0400755 if outer._cancel_requested:
756 # If gather is being cancelled we must propagate the
757 # cancellation regardless of *return_exceptions* argument.
758 # See issue 32684.
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700759 outer.set_exception(exceptions.CancelledError())
Yury Selivanov863b6742018-05-29 17:20:02 -0400760 else:
761 outer.set_result(results)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700762
Yury Selivanov36c2c042017-12-19 07:19:53 -0500763 arg_to_fut = {}
764 children = []
765 nfuts = 0
766 nfinished = 0
767 for arg in coros_or_futures:
768 if arg not in arg_to_fut:
769 fut = ensure_future(arg, loop=loop)
770 if loop is None:
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500771 loop = futures._get_loop(fut)
Yury Selivanov36c2c042017-12-19 07:19:53 -0500772 if fut is not arg:
773 # 'arg' was not a Future, therefore, 'fut' is a new
774 # Future created specifically for 'arg'. Since the caller
775 # can't control it, disable the "destroy pending task"
776 # warning.
777 fut._log_destroy_pending = False
778
779 nfuts += 1
780 arg_to_fut[arg] = fut
781 fut.add_done_callback(_done_callback)
782
783 else:
784 # There's a duplicate Future object in coros_or_futures.
785 fut = arg_to_fut[arg]
786
787 children.append(fut)
788
789 outer = _GatheringFuture(children, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700790 return outer
791
792
793def shield(arg, *, loop=None):
794 """Wait for a future, shielding it from cancellation.
795
796 The statement
797
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200798 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700799
800 is exactly equivalent to the statement
801
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200802 res = await something()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700803
804 *except* that if the coroutine containing it is cancelled, the
805 task running in something() is not cancelled. From the POV of
806 something(), the cancellation did not happen. But its caller is
807 still cancelled, so the yield-from expression still raises
808 CancelledError. Note: If something() is cancelled by other means
809 this will still cancel shield().
810
811 If you want to completely ignore cancellation (not recommended)
812 you can combine shield() with a try/except clause, as follows:
813
814 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200815 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700816 except CancelledError:
817 res = None
818 """
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400819 inner = ensure_future(arg, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700820 if inner.done():
821 # Shortcut.
822 return inner
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500823 loop = futures._get_loop(inner)
Yury Selivanov7661db62016-05-16 15:38:39 -0400824 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700825
Romain Picardb35acc52019-05-07 20:58:24 +0200826 def _inner_done_callback(inner):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700827 if outer.cancelled():
Victor Stinner3531d902015-01-09 01:42:52 +0100828 if not inner.cancelled():
829 # Mark inner's result as retrieved.
830 inner.exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700831 return
Victor Stinner3531d902015-01-09 01:42:52 +0100832
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700833 if inner.cancelled():
834 outer.cancel()
835 else:
836 exc = inner.exception()
837 if exc is not None:
838 outer.set_exception(exc)
839 else:
840 outer.set_result(inner.result())
841
Romain Picardb35acc52019-05-07 20:58:24 +0200842
843 def _outer_done_callback(outer):
844 if not inner.done():
845 inner.remove_done_callback(_inner_done_callback)
846
847 inner.add_done_callback(_inner_done_callback)
848 outer.add_done_callback(_outer_done_callback)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700849 return outer
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700850
851
852def run_coroutine_threadsafe(coro, loop):
853 """Submit a coroutine object to a given event loop.
854
855 Return a concurrent.futures.Future to access the result.
856 """
857 if not coroutines.iscoroutine(coro):
858 raise TypeError('A coroutine object is required')
859 future = concurrent.futures.Future()
860
861 def callback():
Guido van Rossum601953b2015-10-05 16:20:00 -0700862 try:
863 futures._chain_future(ensure_future(coro, loop=loop), future)
Yury Selivanov431b5402019-05-27 14:45:12 +0200864 except (SystemExit, KeyboardInterrupt):
865 raise
866 except BaseException as exc:
Guido van Rossum601953b2015-10-05 16:20:00 -0700867 if future.set_running_or_notify_cancel():
868 future.set_exception(exc)
869 raise
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700870
871 loop.call_soon_threadsafe(callback)
872 return future
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200873
874
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500875# WeakSet containing all alive tasks.
876_all_tasks = weakref.WeakSet()
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200877
878# Dictionary containing tasks that are currently active in
879# all running event loops. {EventLoop: Task}
880_current_tasks = {}
881
882
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500883def _register_task(task):
884 """Register a new task in asyncio as executed by loop."""
885 _all_tasks.add(task)
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200886
887
888def _enter_task(loop, task):
889 current_task = _current_tasks.get(loop)
890 if current_task is not None:
891 raise RuntimeError(f"Cannot enter into task {task!r} while another "
892 f"task {current_task!r} is being executed.")
893 _current_tasks[loop] = task
894
895
896def _leave_task(loop, task):
897 current_task = _current_tasks.get(loop)
898 if current_task is not task:
899 raise RuntimeError(f"Leaving task {task!r} does not match "
900 f"the current task {current_task!r}.")
901 del _current_tasks[loop]
902
903
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500904def _unregister_task(task):
905 """Unregister a task."""
906 _all_tasks.discard(task)
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200907
908
909_py_register_task = _register_task
910_py_unregister_task = _unregister_task
911_py_enter_task = _enter_task
912_py_leave_task = _leave_task
913
914
915try:
916 from _asyncio import (_register_task, _unregister_task,
917 _enter_task, _leave_task,
918 _all_tasks, _current_tasks)
919except ImportError:
920 pass
921else:
922 _c_register_task = _register_task
923 _c_unregister_task = _unregister_task
924 _c_enter_task = _enter_task
925 _c_leave_task = _leave_task