blob: 15422da1b3b2e0015c6cd5a33278f4ba5f824ca7 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Yury Selivanov6370f342017-12-10 18:36:12 -05003__all__ = (
Andrew Svetlovf74ef452017-12-15 07:04:38 +02004 'Task', 'create_task',
Yury Selivanov6370f342017-12-10 18:36:12 -05005 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
Yury Selivanov9edad3c2017-12-11 10:03:48 -05006 'wait', 'wait_for', 'as_completed', 'sleep',
Yury Selivanov6370f342017-12-10 18:36:12 -05007 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
Andrew Svetlov44d1a592017-12-16 21:58:38 +02008 'current_task', 'all_tasks',
9 '_register_task', '_unregister_task', '_enter_task', '_leave_task',
Yury Selivanov6370f342017-12-10 18:36:12 -050010)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012import concurrent.futures
Yury Selivanovf23746a2018-01-22 19:11:18 -050013import contextvars
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014import functools
15import inspect
Alex Grönholmcca4eec2018-08-09 00:06:47 +030016import itertools
Andrew Svetlov5f841b52017-12-09 00:23:48 +020017import types
Yury Selivanov59eb9a42015-05-11 14:48:38 -040018import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019import weakref
20
Yury Selivanova0c1ba62016-10-28 12:52:37 -040021from . import base_tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020022from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070024from . import exceptions
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020026from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027
Alex Grönholmcca4eec2018-08-09 00:06:47 +030028# Helper to generate new task names
29# This uses itertools.count() instead of a "+= 1" operation because the latter
30# is not thread safe. See bpo-11866 for a longer explanation.
31_task_name_counter = itertools.count(1).__next__
32
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070033
Andrew Svetlov44d1a592017-12-16 21:58:38 +020034def current_task(loop=None):
35 """Return a currently executed task."""
36 if loop is None:
37 loop = events.get_running_loop()
38 return _current_tasks.get(loop)
39
40
41def all_tasks(loop=None):
42 """Return a set of all tasks for the loop."""
43 if loop is None:
Yury Selivanov416c1eb2018-05-28 17:54:02 -040044 loop = events.get_running_loop()
Andrew Svetlov97cf0822018-10-13 21:12:40 +030045 # NB: set(_all_tasks) is required to protect
46 # from https://bugs.python.org/issue34970 bug
47 return {t for t in list(_all_tasks)
Yury Selivanov416c1eb2018-05-28 17:54:02 -040048 if futures._get_loop(t) is loop and not t.done()}
49
50
51def _all_tasks_compat(loop=None):
52 # Different from "all_task()" by returning *all* Tasks, including
53 # the completed ones. Used to implement deprecated "Tasks.all_task()"
54 # method.
55 if loop is None:
Andrew Svetlov44d1a592017-12-16 21:58:38 +020056 loop = events.get_event_loop()
Andrew Svetlov97cf0822018-10-13 21:12:40 +030057 # NB: set(_all_tasks) is required to protect
58 # from https://bugs.python.org/issue34970 bug
59 return {t for t in list(_all_tasks) if futures._get_loop(t) is loop}
Andrew Svetlov44d1a592017-12-16 21:58:38 +020060
61
Alex Grönholmcca4eec2018-08-09 00:06:47 +030062def _set_task_name(task, name):
63 if name is not None:
64 try:
65 set_name = task.set_name
66 except AttributeError:
67 pass
68 else:
69 set_name(name)
70
71
Yury Selivanov0cf16f92017-12-25 10:48:15 -050072class Task(futures._PyFuture): # Inherit Python Task implementation
73 # from a Python Future implementation.
74
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070075 """A coroutine wrapped in a Future."""
76
77 # An important invariant maintained while a Task not done:
78 #
79 # - Either _fut_waiter is None, and _step() is scheduled;
80 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
81 #
82 # The only transition from the latter to the former is through
83 # _wakeup(). When _fut_waiter is not None, one of its callbacks
84 # must be _wakeup().
85
Victor Stinnerfe22e092014-12-04 23:00:13 +010086 # If False, don't log a message if the task is destroyed whereas its
87 # status is still pending
88 _log_destroy_pending = True
89
Guido van Rossum1a605ed2013-12-06 12:57:40 -080090 @classmethod
91 def current_task(cls, loop=None):
92 """Return the currently running task in an event loop or None.
93
94 By default the current task for the current event loop is returned.
95
96 None is returned when called not in the context of a Task.
97 """
Andrew Svetlov44d1a592017-12-16 21:58:38 +020098 warnings.warn("Task.current_task() is deprecated, "
99 "use asyncio.current_task() instead",
100 PendingDeprecationWarning,
101 stacklevel=2)
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800102 if loop is None:
103 loop = events.get_event_loop()
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200104 return current_task(loop)
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800105
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700106 @classmethod
107 def all_tasks(cls, loop=None):
108 """Return a set of all tasks for an event loop.
109
110 By default all tasks for the current event loop are returned.
111 """
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200112 warnings.warn("Task.all_tasks() is deprecated, "
113 "use asyncio.all_tasks() instead",
114 PendingDeprecationWarning,
115 stacklevel=2)
Yury Selivanov416c1eb2018-05-28 17:54:02 -0400116 return _all_tasks_compat(loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700117
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300118 def __init__(self, coro, *, loop=None, name=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700119 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200120 if self._source_traceback:
121 del self._source_traceback[-1]
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200122 if not coroutines.iscoroutine(coro):
123 # raise after Future.__init__(), attrs are required for __del__
124 # prevent logging for pending task in __del__
125 self._log_destroy_pending = False
126 raise TypeError(f"a coroutine was expected, got {coro!r}")
127
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300128 if name is None:
129 self._name = f'Task-{_task_name_counter()}'
130 else:
131 self._name = str(name)
132
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700133 self._must_cancel = False
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200134 self._fut_waiter = None
135 self._coro = coro
Yury Selivanovf23746a2018-01-22 19:11:18 -0500136 self._context = contextvars.copy_context()
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200137
Yury Selivanov22feeb82018-01-24 11:31:01 -0500138 self._loop.call_soon(self.__step, context=self._context)
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500139 _register_task(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700140
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900141 def __del__(self):
142 if self._state == futures._PENDING and self._log_destroy_pending:
143 context = {
144 'task': self,
145 'message': 'Task was destroyed but it is pending!',
146 }
147 if self._source_traceback:
148 context['source_traceback'] = self._source_traceback
149 self._loop.call_exception_handler(context)
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500150 super().__del__()
Victor Stinnera02f81f2014-06-24 22:37:53 +0200151
Victor Stinner313a9802014-07-29 12:58:23 +0200152 def _repr_info(self):
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400153 return base_tasks._task_repr_info(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700154
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300155 def get_name(self):
156 return self._name
157
158 def set_name(self, value):
159 self._name = str(value)
160
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500161 def set_result(self, result):
162 raise RuntimeError('Task does not support set_result operation')
163
164 def set_exception(self, exception):
165 raise RuntimeError('Task does not support set_exception operation')
166
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700167 def get_stack(self, *, limit=None):
168 """Return the list of stack frames for this task's coroutine.
169
Victor Stinnerd87de832014-12-02 17:57:04 +0100170 If the coroutine is not done, this returns the stack where it is
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700171 suspended. If the coroutine has completed successfully or was
172 cancelled, this returns an empty list. If the coroutine was
173 terminated by an exception, this returns the list of traceback
174 frames.
175
176 The frames are always ordered from oldest to newest.
177
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500178 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700179 return; by default all available frames are returned. Its
180 meaning differs depending on whether a stack or a traceback is
181 returned: the newest frames of a stack are returned, but the
182 oldest frames of a traceback are returned. (This matches the
183 behavior of the traceback module.)
184
185 For reasons beyond our control, only one stack frame is
186 returned for a suspended coroutine.
187 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400188 return base_tasks._task_get_stack(self, limit)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700189
190 def print_stack(self, *, limit=None, file=None):
191 """Print the stack or traceback for this task's coroutine.
192
193 This produces output similar to that of the traceback module,
194 for the frames retrieved by get_stack(). The limit argument
195 is passed to get_stack(). The file argument is an I/O stream
R David Murray8e069d52014-09-24 13:13:45 -0400196 to which the output is written; by default output is written
197 to sys.stderr.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700198 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400199 return base_tasks._task_print_stack(self, limit, file)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700200
201 def cancel(self):
R David Murray8e069d52014-09-24 13:13:45 -0400202 """Request that this task cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200203
Victor Stinner8d213572014-06-02 23:06:46 +0200204 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200205 wrapped coroutine on the next cycle through the event loop.
206 The coroutine then has a chance to clean up or even deny
207 the request using try/except/finally.
208
R David Murray8e069d52014-09-24 13:13:45 -0400209 Unlike Future.cancel, this does not guarantee that the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200210 task will be cancelled: the exception might be caught and
R David Murray8e069d52014-09-24 13:13:45 -0400211 acted upon, delaying cancellation of the task or preventing
212 cancellation completely. The task may also return a value or
213 raise a different exception.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200214
215 Immediately after this method is called, Task.cancelled() will
216 not return True (unless the task was already cancelled). A
217 task will be marked as cancelled when the wrapped coroutine
218 terminates with a CancelledError exception (even if cancel()
219 was not called).
220 """
Yury Selivanov7ce1c6f2017-06-11 13:49:18 +0000221 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700222 if self.done():
223 return False
224 if self._fut_waiter is not None:
225 if self._fut_waiter.cancel():
226 # Leave self._fut_waiter; it may be a Task that
227 # catches and ignores the cancellation so we may have
228 # to cancel it again later.
229 return True
Yury Selivanov22feeb82018-01-24 11:31:01 -0500230 # It must be the case that self.__step is already scheduled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700231 self._must_cancel = True
232 return True
233
Yury Selivanov22feeb82018-01-24 11:31:01 -0500234 def __step(self, exc=None):
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500235 if self.done():
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700236 raise exceptions.InvalidStateError(
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500237 f'_step(): already done: {self!r}, {exc!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700238 if self._must_cancel:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700239 if not isinstance(exc, exceptions.CancelledError):
240 exc = exceptions.CancelledError()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700241 self._must_cancel = False
242 coro = self._coro
243 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800244
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200245 _enter_task(self._loop, self)
Yury Selivanovd59bba82015-11-20 12:41:03 -0500246 # Call either coro.throw(exc) or coro.send(None).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700247 try:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500248 if exc is None:
249 # We use the `send` method directly, because coroutines
250 # don't have `__iter__` and `__next__` methods.
251 result = coro.send(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700252 else:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500253 result = coro.throw(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700254 except StopIteration as exc:
INADA Naoki991adca2017-05-11 21:18:38 +0900255 if self._must_cancel:
256 # Task is cancelled right before coro stops.
257 self._must_cancel = False
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700258 super().set_exception(exceptions.CancelledError())
INADA Naoki991adca2017-05-11 21:18:38 +0900259 else:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500260 super().set_result(exc.value)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700261 except exceptions.CancelledError:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700262 super().cancel() # I.e., Future.cancel(self).
263 except Exception as exc:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500264 super().set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700265 except BaseException as exc:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500266 super().set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700267 raise
268 else:
Guido van Rossum1140a032016-09-09 12:54:54 -0700269 blocking = getattr(result, '_asyncio_future_blocking', None)
270 if blocking is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700271 # Yielded Future must come from Future.__iter__().
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500272 if futures._get_loop(result) is not self._loop:
Yury Selivanov6370f342017-12-10 18:36:12 -0500273 new_exc = RuntimeError(
274 f'Task {self!r} got Future '
275 f'{result!r} attached to a different loop')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500276 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500277 self.__step, new_exc, context=self._context)
Guido van Rossum1140a032016-09-09 12:54:54 -0700278 elif blocking:
Yury Selivanov4145c832016-10-09 12:19:12 -0400279 if result is self:
Yury Selivanov6370f342017-12-10 18:36:12 -0500280 new_exc = RuntimeError(
281 f'Task cannot await on itself: {self!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500282 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500283 self.__step, new_exc, context=self._context)
Yury Selivanov4145c832016-10-09 12:19:12 -0400284 else:
285 result._asyncio_future_blocking = False
Yury Selivanovf23746a2018-01-22 19:11:18 -0500286 result.add_done_callback(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500287 self.__wakeup, context=self._context)
Yury Selivanov4145c832016-10-09 12:19:12 -0400288 self._fut_waiter = result
289 if self._must_cancel:
290 if self._fut_waiter.cancel():
291 self._must_cancel = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700292 else:
Yury Selivanov6370f342017-12-10 18:36:12 -0500293 new_exc = RuntimeError(
294 f'yield was used instead of yield from '
295 f'in task {self!r} with {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500296 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500297 self.__step, new_exc, context=self._context)
Yury Selivanov6370f342017-12-10 18:36:12 -0500298
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700299 elif result is None:
300 # Bare yield relinquishes control for one event loop iteration.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500301 self._loop.call_soon(self.__step, context=self._context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700302 elif inspect.isgenerator(result):
303 # Yielding a generator is just wrong.
Yury Selivanov6370f342017-12-10 18:36:12 -0500304 new_exc = RuntimeError(
305 f'yield was used instead of yield from for '
Serhiy Storchaka66553542018-05-20 16:30:31 +0300306 f'generator in task {self!r} with {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500307 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500308 self.__step, new_exc, context=self._context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700309 else:
310 # Yielding something else is an error.
Yury Selivanov6370f342017-12-10 18:36:12 -0500311 new_exc = RuntimeError(f'Task got bad yield: {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500312 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500313 self.__step, new_exc, context=self._context)
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800314 finally:
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200315 _leave_task(self._loop, self)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100316 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700317
Yury Selivanov22feeb82018-01-24 11:31:01 -0500318 def __wakeup(self, future):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700319 try:
Yury Selivanova4afc482015-11-16 15:12:10 -0500320 future.result()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700321 except Exception as exc:
322 # This may also be a cancellation.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500323 self.__step(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700324 else:
Yury Selivanova4afc482015-11-16 15:12:10 -0500325 # Don't pass the value of `future.result()` explicitly,
326 # as `Future.__iter__` and `Future.__await__` don't need it.
327 # If we call `_step(value, None)` instead of `_step()`,
328 # Python eval loop would use `.send(value)` method call,
329 # instead of `__next__()`, which is slower for futures
330 # that return non-generator iterators from their `__iter__`.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500331 self.__step()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700332 self = None # Needed to break cycles when an exception occurs.
333
334
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400335_PyTask = Task
336
337
338try:
339 import _asyncio
340except ImportError:
341 pass
342else:
343 # _CTask is needed for tests.
344 Task = _CTask = _asyncio.Task
345
346
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300347def create_task(coro, *, name=None):
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200348 """Schedule the execution of a coroutine object in a spawn task.
349
350 Return a Task object.
351 """
352 loop = events.get_running_loop()
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300353 task = loop.create_task(coro)
354 _set_task_name(task, name)
355 return task
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200356
357
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700358# wait() and as_completed() similar to those in PEP 3148.
359
360FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
361FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
362ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
363
364
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200365async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700366 """Wait for the Futures and coroutines given by fs to complete.
367
Victor Stinnerdb74d982014-06-10 11:16:05 +0200368 The sequence futures must not be empty.
369
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700370 Coroutines will be wrapped in Tasks.
371
372 Returns two sets of Future: (done, pending).
373
374 Usage:
375
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200376 done, pending = await asyncio.wait(fs)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700377
378 Note: This does not raise TimeoutError! Futures that aren't done
379 when the timeout occurs are returned in the second set.
380 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700381 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500382 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700383 if not fs:
384 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200385 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
Yury Selivanov6370f342017-12-10 18:36:12 -0500386 raise ValueError(f'Invalid return_when value: {return_when}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700387
388 if loop is None:
João Júnior558c49b2018-09-24 06:51:22 -0300389 loop = events.get_running_loop()
390 else:
Yury Selivanovfad6af22018-09-25 17:44:52 -0400391 warnings.warn("The loop argument is deprecated and scheduled for "
392 "removal in Python 3.10.",
João Júnior558c49b2018-09-24 06:51:22 -0300393 DeprecationWarning, stacklevel=2)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700394
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400395 fs = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700396
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200397 return await _wait(fs, timeout, return_when, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700398
399
Victor Stinner59e08022014-08-28 11:19:25 +0200400def _release_waiter(waiter, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700401 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200402 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700403
404
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200405async def wait_for(fut, timeout, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700406 """Wait for the single Future or coroutine to complete, with timeout.
407
408 Coroutine will be wrapped in Task.
409
Victor Stinner421e49b2014-01-23 17:40:59 +0100410 Returns result of the Future or coroutine. When a timeout occurs,
411 it cancels the task and raises TimeoutError. To avoid the task
412 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700413
Victor Stinner922bc2c2015-01-15 16:29:10 +0100414 If the wait is cancelled, the task is also cancelled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700415
Victor Stinner922bc2c2015-01-15 16:29:10 +0100416 This function is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700417 """
418 if loop is None:
João Júnior558c49b2018-09-24 06:51:22 -0300419 loop = events.get_running_loop()
420 else:
Yury Selivanovfad6af22018-09-25 17:44:52 -0400421 warnings.warn("The loop argument is deprecated and scheduled for "
422 "removal in Python 3.10.",
João Júnior558c49b2018-09-24 06:51:22 -0300423 DeprecationWarning, stacklevel=2)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700424
Guido van Rossum48c66c32014-01-29 14:30:38 -0800425 if timeout is None:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200426 return await fut
Guido van Rossum48c66c32014-01-29 14:30:38 -0800427
Victor K4d071892017-10-05 19:04:39 +0300428 if timeout <= 0:
429 fut = ensure_future(fut, loop=loop)
430
431 if fut.done():
432 return fut.result()
433
434 fut.cancel()
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700435 raise exceptions.TimeoutError()
Victor K4d071892017-10-05 19:04:39 +0300436
Yury Selivanov7661db62016-05-16 15:38:39 -0400437 waiter = loop.create_future()
Victor Stinner59e08022014-08-28 11:19:25 +0200438 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
439 cb = functools.partial(_release_waiter, waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700440
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400441 fut = ensure_future(fut, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700442 fut.add_done_callback(cb)
443
444 try:
Victor Stinner59e08022014-08-28 11:19:25 +0200445 # wait until the future completes or the timeout
Victor Stinner922bc2c2015-01-15 16:29:10 +0100446 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200447 await waiter
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700448 except exceptions.CancelledError:
Victor Stinner922bc2c2015-01-15 16:29:10 +0100449 fut.remove_done_callback(cb)
450 fut.cancel()
451 raise
Victor Stinner59e08022014-08-28 11:19:25 +0200452
453 if fut.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700454 return fut.result()
455 else:
456 fut.remove_done_callback(cb)
Elvis Pranskevichuse2b340a2018-05-29 17:31:01 -0400457 # We must ensure that the task is not running
458 # after wait_for() returns.
459 # See https://bugs.python.org/issue32751
460 await _cancel_and_wait(fut, loop=loop)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700461 raise exceptions.TimeoutError()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700462 finally:
463 timeout_handle.cancel()
464
465
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200466async def _wait(fs, timeout, return_when, loop):
Elvis Pranskevichuse2b340a2018-05-29 17:31:01 -0400467 """Internal helper for wait().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700468
469 The fs argument must be a collection of Futures.
470 """
471 assert fs, 'Set of Futures is empty.'
Yury Selivanov7661db62016-05-16 15:38:39 -0400472 waiter = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700473 timeout_handle = None
474 if timeout is not None:
475 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
476 counter = len(fs)
477
478 def _on_completion(f):
479 nonlocal counter
480 counter -= 1
481 if (counter <= 0 or
482 return_when == FIRST_COMPLETED or
483 return_when == FIRST_EXCEPTION and (not f.cancelled() and
484 f.exception() is not None)):
485 if timeout_handle is not None:
486 timeout_handle.cancel()
487 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200488 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700489
490 for f in fs:
491 f.add_done_callback(_on_completion)
492
493 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200494 await waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700495 finally:
496 if timeout_handle is not None:
497 timeout_handle.cancel()
498
499 done, pending = set(), set()
500 for f in fs:
501 f.remove_done_callback(_on_completion)
502 if f.done():
503 done.add(f)
504 else:
505 pending.add(f)
506 return done, pending
507
508
Elvis Pranskevichuse2b340a2018-05-29 17:31:01 -0400509async def _cancel_and_wait(fut, loop):
510 """Cancel the *fut* future or task and wait until it completes."""
511
512 waiter = loop.create_future()
513 cb = functools.partial(_release_waiter, waiter)
514 fut.add_done_callback(cb)
515
516 try:
517 fut.cancel()
518 # We cannot wait on *fut* directly to make
519 # sure _cancel_and_wait itself is reliably cancellable.
520 await waiter
521 finally:
522 fut.remove_done_callback(cb)
523
524
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700525# This is *not* a @coroutine! It is just an iterator (yielding Futures).
526def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800527 """Return an iterator whose values are coroutines.
528
529 When waiting for the yielded coroutines you'll get the results (or
530 exceptions!) of the original Futures (or coroutines), in the order
531 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700532
533 This differs from PEP 3148; the proper way to use this is:
534
535 for f in as_completed(fs):
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200536 result = await f # The 'await' may raise.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700537 # Use result.
538
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200539 If a timeout is specified, the 'await' will raise
Guido van Rossumb58f0532014-02-12 17:58:19 -0800540 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700541
542 Note: The futures 'f' are not necessarily members of fs.
543 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700544 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500545 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700546 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400547 todo = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800548 from .queues import Queue # Import here to avoid circular import problem.
549 done = Queue(loop=loop)
550 timeout_handle = None
551
552 def _on_timeout():
553 for f in todo:
554 f.remove_done_callback(_on_completion)
555 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
556 todo.clear() # Can't do todo.remove(f) in the loop.
557
558 def _on_completion(f):
559 if not todo:
560 return # _on_timeout() was here first.
561 todo.remove(f)
562 done.put_nowait(f)
563 if not todo and timeout_handle is not None:
564 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700565
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200566 async def _wait_for_one():
567 f = await done.get()
Guido van Rossumb58f0532014-02-12 17:58:19 -0800568 if f is None:
569 # Dummy value from _on_timeout().
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700570 raise exceptions.TimeoutError
Guido van Rossumb58f0532014-02-12 17:58:19 -0800571 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700572
Guido van Rossumb58f0532014-02-12 17:58:19 -0800573 for f in todo:
574 f.add_done_callback(_on_completion)
575 if todo and timeout is not None:
576 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700577 for _ in range(len(todo)):
578 yield _wait_for_one()
579
580
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200581@types.coroutine
582def __sleep0():
583 """Skip one event loop run cycle.
584
585 This is a private helper for 'asyncio.sleep()', used
586 when the 'delay' is set to 0. It uses a bare 'yield'
Yury Selivanov22feeb82018-01-24 11:31:01 -0500587 expression (which Task.__step knows how to handle)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200588 instead of creating a Future object.
589 """
590 yield
591
592
593async def sleep(delay, result=None, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700594 """Coroutine that completes after a given time (in seconds)."""
Andrew Svetlov5382c052017-12-17 16:41:30 +0200595 if delay <= 0:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200596 await __sleep0()
Yury Selivanovade04122015-11-05 14:29:04 -0500597 return result
598
Yury Selivanov7661db62016-05-16 15:38:39 -0400599 if loop is None:
João Júnior558c49b2018-09-24 06:51:22 -0300600 loop = events.get_running_loop()
601 else:
Yury Selivanovfad6af22018-09-25 17:44:52 -0400602 warnings.warn("The loop argument is deprecated and scheduled for "
603 "removal in Python 3.10.",
João Júnior558c49b2018-09-24 06:51:22 -0300604 DeprecationWarning, stacklevel=2)
605
Yury Selivanov7661db62016-05-16 15:38:39 -0400606 future = loop.create_future()
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500607 h = loop.call_later(delay,
608 futures._set_result_unless_cancelled,
609 future, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700610 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200611 return await future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700612 finally:
613 h.cancel()
614
615
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400616def ensure_future(coro_or_future, *, loop=None):
Yury Selivanov620279b2015-10-02 15:00:19 -0400617 """Wrap a coroutine or an awaitable in a future.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400618
619 If the argument is a Future, it is returned directly.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700620 """
jimmylaie549c4b2018-05-28 06:42:05 -1000621 if coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200622 if loop is None:
623 loop = events.get_event_loop()
624 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200625 if task._source_traceback:
626 del task._source_traceback[-1]
627 return task
jimmylaie549c4b2018-05-28 06:42:05 -1000628 elif futures.isfuture(coro_or_future):
629 if loop is not None and loop is not futures._get_loop(coro_or_future):
630 raise ValueError('loop argument must agree with Future')
631 return coro_or_future
Victor Stinner3f438a92017-11-28 14:43:52 +0100632 elif inspect.isawaitable(coro_or_future):
Yury Selivanov620279b2015-10-02 15:00:19 -0400633 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700634 else:
Charles Renwickae5b3262017-04-21 16:49:48 -0400635 raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
636 'required')
Yury Selivanov620279b2015-10-02 15:00:19 -0400637
638
639@coroutine
640def _wrap_awaitable(awaitable):
641 """Helper for asyncio.ensure_future().
642
643 Wraps awaitable (an object with __await__) into a coroutine
644 that will later be wrapped in a Task by ensure_future().
645 """
646 return (yield from awaitable.__await__())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700647
648
649class _GatheringFuture(futures.Future):
650 """Helper for gather().
651
652 This overrides cancel() to cancel all the children and act more
653 like Task.cancel(), which doesn't immediately mark itself as
654 cancelled.
655 """
656
657 def __init__(self, children, *, loop=None):
658 super().__init__(loop=loop)
659 self._children = children
Yury Selivanov863b6742018-05-29 17:20:02 -0400660 self._cancel_requested = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700661
662 def cancel(self):
663 if self.done():
664 return False
Yury Selivanov3d676152016-10-21 17:22:17 -0400665 ret = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700666 for child in self._children:
Yury Selivanov3d676152016-10-21 17:22:17 -0400667 if child.cancel():
668 ret = True
Yury Selivanov863b6742018-05-29 17:20:02 -0400669 if ret:
670 # If any child tasks were actually cancelled, we should
671 # propagate the cancellation request regardless of
672 # *return_exceptions* argument. See issue 32684.
673 self._cancel_requested = True
Yury Selivanov3d676152016-10-21 17:22:17 -0400674 return ret
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700675
676
677def gather(*coros_or_futures, loop=None, return_exceptions=False):
Yury Selivanov36c2c042017-12-19 07:19:53 -0500678 """Return a future aggregating results from the given coroutines/futures.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700679
Guido van Rossume3c65a72016-09-30 08:17:15 -0700680 Coroutines will be wrapped in a future and scheduled in the event
681 loop. They will not necessarily be scheduled in the same order as
682 passed in.
683
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700684 All futures must share the same event loop. If all the tasks are
685 done successfully, the returned future's result is the list of
686 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500687 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700688 exceptions in the tasks are treated the same as successful
689 results, and gathered in the result list; otherwise, the first
690 raised exception will be immediately propagated to the returned
691 future.
692
693 Cancellation: if the outer Future is cancelled, all children (that
694 have not completed yet) are also cancelled. If any child is
695 cancelled, this is treated as if it raised CancelledError --
696 the outer Future is *not* cancelled in this case. (This is to
697 prevent the cancellation of one child to cause other children to
698 be cancelled.)
699 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200700 if not coros_or_futures:
Yury Selivanov7661db62016-05-16 15:38:39 -0400701 if loop is None:
702 loop = events.get_event_loop()
703 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700704 outer.set_result([])
705 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200706
Yury Selivanov36c2c042017-12-19 07:19:53 -0500707 def _done_callback(fut):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700708 nonlocal nfinished
Yury Selivanov36c2c042017-12-19 07:19:53 -0500709 nfinished += 1
710
Victor Stinner3531d902015-01-09 01:42:52 +0100711 if outer.done():
712 if not fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700713 # Mark exception retrieved.
714 fut.exception()
715 return
Victor Stinner3531d902015-01-09 01:42:52 +0100716
Yury Selivanov36c2c042017-12-19 07:19:53 -0500717 if not return_exceptions:
718 if fut.cancelled():
719 # Check if 'fut' is cancelled first, as
720 # 'fut.exception()' will *raise* a CancelledError
721 # instead of returning it.
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700722 exc = exceptions.CancelledError()
Yury Selivanov36c2c042017-12-19 07:19:53 -0500723 outer.set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700724 return
Yury Selivanov36c2c042017-12-19 07:19:53 -0500725 else:
726 exc = fut.exception()
727 if exc is not None:
728 outer.set_exception(exc)
729 return
730
731 if nfinished == nfuts:
732 # All futures are done; create a list of results
733 # and set it to the 'outer' future.
734 results = []
735
736 for fut in children:
737 if fut.cancelled():
738 # Check if 'fut' is cancelled first, as
739 # 'fut.exception()' will *raise* a CancelledError
740 # instead of returning it.
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700741 res = exceptions.CancelledError()
Yury Selivanov36c2c042017-12-19 07:19:53 -0500742 else:
743 res = fut.exception()
744 if res is None:
745 res = fut.result()
746 results.append(res)
747
Yury Selivanov863b6742018-05-29 17:20:02 -0400748 if outer._cancel_requested:
749 # If gather is being cancelled we must propagate the
750 # cancellation regardless of *return_exceptions* argument.
751 # See issue 32684.
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700752 outer.set_exception(exceptions.CancelledError())
Yury Selivanov863b6742018-05-29 17:20:02 -0400753 else:
754 outer.set_result(results)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700755
Yury Selivanov36c2c042017-12-19 07:19:53 -0500756 arg_to_fut = {}
757 children = []
758 nfuts = 0
759 nfinished = 0
760 for arg in coros_or_futures:
761 if arg not in arg_to_fut:
762 fut = ensure_future(arg, loop=loop)
763 if loop is None:
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500764 loop = futures._get_loop(fut)
Yury Selivanov36c2c042017-12-19 07:19:53 -0500765 if fut is not arg:
766 # 'arg' was not a Future, therefore, 'fut' is a new
767 # Future created specifically for 'arg'. Since the caller
768 # can't control it, disable the "destroy pending task"
769 # warning.
770 fut._log_destroy_pending = False
771
772 nfuts += 1
773 arg_to_fut[arg] = fut
774 fut.add_done_callback(_done_callback)
775
776 else:
777 # There's a duplicate Future object in coros_or_futures.
778 fut = arg_to_fut[arg]
779
780 children.append(fut)
781
782 outer = _GatheringFuture(children, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700783 return outer
784
785
786def shield(arg, *, loop=None):
787 """Wait for a future, shielding it from cancellation.
788
789 The statement
790
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200791 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700792
793 is exactly equivalent to the statement
794
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200795 res = await something()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700796
797 *except* that if the coroutine containing it is cancelled, the
798 task running in something() is not cancelled. From the POV of
799 something(), the cancellation did not happen. But its caller is
800 still cancelled, so the yield-from expression still raises
801 CancelledError. Note: If something() is cancelled by other means
802 this will still cancel shield().
803
804 If you want to completely ignore cancellation (not recommended)
805 you can combine shield() with a try/except clause, as follows:
806
807 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200808 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700809 except CancelledError:
810 res = None
811 """
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400812 inner = ensure_future(arg, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700813 if inner.done():
814 # Shortcut.
815 return inner
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500816 loop = futures._get_loop(inner)
Yury Selivanov7661db62016-05-16 15:38:39 -0400817 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700818
819 def _done_callback(inner):
820 if outer.cancelled():
Victor Stinner3531d902015-01-09 01:42:52 +0100821 if not inner.cancelled():
822 # Mark inner's result as retrieved.
823 inner.exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700824 return
Victor Stinner3531d902015-01-09 01:42:52 +0100825
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700826 if inner.cancelled():
827 outer.cancel()
828 else:
829 exc = inner.exception()
830 if exc is not None:
831 outer.set_exception(exc)
832 else:
833 outer.set_result(inner.result())
834
835 inner.add_done_callback(_done_callback)
836 return outer
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700837
838
839def run_coroutine_threadsafe(coro, loop):
840 """Submit a coroutine object to a given event loop.
841
842 Return a concurrent.futures.Future to access the result.
843 """
844 if not coroutines.iscoroutine(coro):
845 raise TypeError('A coroutine object is required')
846 future = concurrent.futures.Future()
847
848 def callback():
Guido van Rossum601953b2015-10-05 16:20:00 -0700849 try:
850 futures._chain_future(ensure_future(coro, loop=loop), future)
851 except Exception as exc:
852 if future.set_running_or_notify_cancel():
853 future.set_exception(exc)
854 raise
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700855
856 loop.call_soon_threadsafe(callback)
857 return future
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200858
859
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500860# WeakSet containing all alive tasks.
861_all_tasks = weakref.WeakSet()
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200862
863# Dictionary containing tasks that are currently active in
864# all running event loops. {EventLoop: Task}
865_current_tasks = {}
866
867
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500868def _register_task(task):
869 """Register a new task in asyncio as executed by loop."""
870 _all_tasks.add(task)
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200871
872
873def _enter_task(loop, task):
874 current_task = _current_tasks.get(loop)
875 if current_task is not None:
876 raise RuntimeError(f"Cannot enter into task {task!r} while another "
877 f"task {current_task!r} is being executed.")
878 _current_tasks[loop] = task
879
880
881def _leave_task(loop, task):
882 current_task = _current_tasks.get(loop)
883 if current_task is not task:
884 raise RuntimeError(f"Leaving task {task!r} does not match "
885 f"the current task {current_task!r}.")
886 del _current_tasks[loop]
887
888
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500889def _unregister_task(task):
890 """Unregister a task."""
891 _all_tasks.discard(task)
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200892
893
894_py_register_task = _register_task
895_py_unregister_task = _unregister_task
896_py_enter_task = _enter_task
897_py_leave_task = _leave_task
898
899
900try:
901 from _asyncio import (_register_task, _unregister_task,
902 _enter_task, _leave_task,
903 _all_tasks, _current_tasks)
904except ImportError:
905 pass
906else:
907 _c_register_task = _register_task
908 _c_unregister_task = _unregister_task
909 _c_enter_task = _enter_task
910 _c_leave_task = _leave_task