blob: 743e82baff7aac0692a85d33398069126fcfb2c4 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Yury Selivanov6370f342017-12-10 18:36:12 -05003__all__ = (
Andrew Svetlovf74ef452017-12-15 07:04:38 +02004 'Task', 'create_task',
Yury Selivanov6370f342017-12-10 18:36:12 -05005 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
Yury Selivanov9edad3c2017-12-11 10:03:48 -05006 'wait', 'wait_for', 'as_completed', 'sleep',
Yury Selivanov6370f342017-12-10 18:36:12 -05007 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
Andrew Svetlov44d1a592017-12-16 21:58:38 +02008 'current_task', 'all_tasks',
9 '_register_task', '_unregister_task', '_enter_task', '_leave_task',
Yury Selivanov6370f342017-12-10 18:36:12 -050010)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012import concurrent.futures
Yury Selivanovf23746a2018-01-22 19:11:18 -050013import contextvars
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014import functools
15import inspect
Alex Grönholmcca4eec2018-08-09 00:06:47 +030016import itertools
Andrew Svetlov5f841b52017-12-09 00:23:48 +020017import types
Yury Selivanov59eb9a42015-05-11 14:48:38 -040018import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019import weakref
20
Yury Selivanova0c1ba62016-10-28 12:52:37 -040021from . import base_tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020022from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070024from . import exceptions
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020026from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027
Alex Grönholmcca4eec2018-08-09 00:06:47 +030028# Helper to generate new task names
29# This uses itertools.count() instead of a "+= 1" operation because the latter
30# is not thread safe. See bpo-11866 for a longer explanation.
31_task_name_counter = itertools.count(1).__next__
32
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070033
Andrew Svetlov44d1a592017-12-16 21:58:38 +020034def current_task(loop=None):
35 """Return a currently executed task."""
36 if loop is None:
37 loop = events.get_running_loop()
38 return _current_tasks.get(loop)
39
40
41def all_tasks(loop=None):
42 """Return a set of all tasks for the loop."""
43 if loop is None:
Yury Selivanov416c1eb2018-05-28 17:54:02 -040044 loop = events.get_running_loop()
45 return {t for t in _all_tasks
46 if futures._get_loop(t) is loop and not t.done()}
47
48
49def _all_tasks_compat(loop=None):
50 # Different from "all_task()" by returning *all* Tasks, including
51 # the completed ones. Used to implement deprecated "Tasks.all_task()"
52 # method.
53 if loop is None:
Andrew Svetlov44d1a592017-12-16 21:58:38 +020054 loop = events.get_event_loop()
Yury Selivanovca9b36c2017-12-23 15:04:15 -050055 return {t for t in _all_tasks if futures._get_loop(t) is loop}
Andrew Svetlov44d1a592017-12-16 21:58:38 +020056
57
Alex Grönholmcca4eec2018-08-09 00:06:47 +030058def _set_task_name(task, name):
59 if name is not None:
60 try:
61 set_name = task.set_name
62 except AttributeError:
63 pass
64 else:
65 set_name(name)
66
67
Yury Selivanov0cf16f92017-12-25 10:48:15 -050068class Task(futures._PyFuture): # Inherit Python Task implementation
69 # from a Python Future implementation.
70
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070071 """A coroutine wrapped in a Future."""
72
73 # An important invariant maintained while a Task not done:
74 #
75 # - Either _fut_waiter is None, and _step() is scheduled;
76 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
77 #
78 # The only transition from the latter to the former is through
79 # _wakeup(). When _fut_waiter is not None, one of its callbacks
80 # must be _wakeup().
81
Victor Stinnerfe22e092014-12-04 23:00:13 +010082 # If False, don't log a message if the task is destroyed whereas its
83 # status is still pending
84 _log_destroy_pending = True
85
Guido van Rossum1a605ed2013-12-06 12:57:40 -080086 @classmethod
87 def current_task(cls, loop=None):
88 """Return the currently running task in an event loop or None.
89
90 By default the current task for the current event loop is returned.
91
92 None is returned when called not in the context of a Task.
93 """
Andrew Svetlov44d1a592017-12-16 21:58:38 +020094 warnings.warn("Task.current_task() is deprecated, "
95 "use asyncio.current_task() instead",
96 PendingDeprecationWarning,
97 stacklevel=2)
Guido van Rossum1a605ed2013-12-06 12:57:40 -080098 if loop is None:
99 loop = events.get_event_loop()
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200100 return current_task(loop)
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800101
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700102 @classmethod
103 def all_tasks(cls, loop=None):
104 """Return a set of all tasks for an event loop.
105
106 By default all tasks for the current event loop are returned.
107 """
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200108 warnings.warn("Task.all_tasks() is deprecated, "
109 "use asyncio.all_tasks() instead",
110 PendingDeprecationWarning,
111 stacklevel=2)
Yury Selivanov416c1eb2018-05-28 17:54:02 -0400112 return _all_tasks_compat(loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700113
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300114 def __init__(self, coro, *, loop=None, name=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700115 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200116 if self._source_traceback:
117 del self._source_traceback[-1]
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200118 if not coroutines.iscoroutine(coro):
119 # raise after Future.__init__(), attrs are required for __del__
120 # prevent logging for pending task in __del__
121 self._log_destroy_pending = False
122 raise TypeError(f"a coroutine was expected, got {coro!r}")
123
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300124 if name is None:
125 self._name = f'Task-{_task_name_counter()}'
126 else:
127 self._name = str(name)
128
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700129 self._must_cancel = False
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200130 self._fut_waiter = None
131 self._coro = coro
Yury Selivanovf23746a2018-01-22 19:11:18 -0500132 self._context = contextvars.copy_context()
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200133
Yury Selivanov22feeb82018-01-24 11:31:01 -0500134 self._loop.call_soon(self.__step, context=self._context)
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500135 _register_task(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700136
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900137 def __del__(self):
138 if self._state == futures._PENDING and self._log_destroy_pending:
139 context = {
140 'task': self,
141 'message': 'Task was destroyed but it is pending!',
142 }
143 if self._source_traceback:
144 context['source_traceback'] = self._source_traceback
145 self._loop.call_exception_handler(context)
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500146 super().__del__()
Victor Stinnera02f81f2014-06-24 22:37:53 +0200147
Victor Stinner313a9802014-07-29 12:58:23 +0200148 def _repr_info(self):
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400149 return base_tasks._task_repr_info(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700150
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300151 def get_name(self):
152 return self._name
153
154 def set_name(self, value):
155 self._name = str(value)
156
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500157 def set_result(self, result):
158 raise RuntimeError('Task does not support set_result operation')
159
160 def set_exception(self, exception):
161 raise RuntimeError('Task does not support set_exception operation')
162
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700163 def get_stack(self, *, limit=None):
164 """Return the list of stack frames for this task's coroutine.
165
Victor Stinnerd87de832014-12-02 17:57:04 +0100166 If the coroutine is not done, this returns the stack where it is
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700167 suspended. If the coroutine has completed successfully or was
168 cancelled, this returns an empty list. If the coroutine was
169 terminated by an exception, this returns the list of traceback
170 frames.
171
172 The frames are always ordered from oldest to newest.
173
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500174 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700175 return; by default all available frames are returned. Its
176 meaning differs depending on whether a stack or a traceback is
177 returned: the newest frames of a stack are returned, but the
178 oldest frames of a traceback are returned. (This matches the
179 behavior of the traceback module.)
180
181 For reasons beyond our control, only one stack frame is
182 returned for a suspended coroutine.
183 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400184 return base_tasks._task_get_stack(self, limit)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700185
186 def print_stack(self, *, limit=None, file=None):
187 """Print the stack or traceback for this task's coroutine.
188
189 This produces output similar to that of the traceback module,
190 for the frames retrieved by get_stack(). The limit argument
191 is passed to get_stack(). The file argument is an I/O stream
R David Murray8e069d52014-09-24 13:13:45 -0400192 to which the output is written; by default output is written
193 to sys.stderr.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700194 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400195 return base_tasks._task_print_stack(self, limit, file)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700196
197 def cancel(self):
R David Murray8e069d52014-09-24 13:13:45 -0400198 """Request that this task cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200199
Victor Stinner8d213572014-06-02 23:06:46 +0200200 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200201 wrapped coroutine on the next cycle through the event loop.
202 The coroutine then has a chance to clean up or even deny
203 the request using try/except/finally.
204
R David Murray8e069d52014-09-24 13:13:45 -0400205 Unlike Future.cancel, this does not guarantee that the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200206 task will be cancelled: the exception might be caught and
R David Murray8e069d52014-09-24 13:13:45 -0400207 acted upon, delaying cancellation of the task or preventing
208 cancellation completely. The task may also return a value or
209 raise a different exception.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200210
211 Immediately after this method is called, Task.cancelled() will
212 not return True (unless the task was already cancelled). A
213 task will be marked as cancelled when the wrapped coroutine
214 terminates with a CancelledError exception (even if cancel()
215 was not called).
216 """
Yury Selivanov7ce1c6f2017-06-11 13:49:18 +0000217 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700218 if self.done():
219 return False
220 if self._fut_waiter is not None:
221 if self._fut_waiter.cancel():
222 # Leave self._fut_waiter; it may be a Task that
223 # catches and ignores the cancellation so we may have
224 # to cancel it again later.
225 return True
Yury Selivanov22feeb82018-01-24 11:31:01 -0500226 # It must be the case that self.__step is already scheduled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700227 self._must_cancel = True
228 return True
229
Yury Selivanov22feeb82018-01-24 11:31:01 -0500230 def __step(self, exc=None):
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500231 if self.done():
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700232 raise exceptions.InvalidStateError(
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500233 f'_step(): already done: {self!r}, {exc!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700234 if self._must_cancel:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700235 if not isinstance(exc, exceptions.CancelledError):
236 exc = exceptions.CancelledError()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700237 self._must_cancel = False
238 coro = self._coro
239 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800240
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200241 _enter_task(self._loop, self)
Yury Selivanovd59bba82015-11-20 12:41:03 -0500242 # Call either coro.throw(exc) or coro.send(None).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700243 try:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500244 if exc is None:
245 # We use the `send` method directly, because coroutines
246 # don't have `__iter__` and `__next__` methods.
247 result = coro.send(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700248 else:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500249 result = coro.throw(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700250 except StopIteration as exc:
INADA Naoki991adca2017-05-11 21:18:38 +0900251 if self._must_cancel:
252 # Task is cancelled right before coro stops.
253 self._must_cancel = False
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700254 super().set_exception(exceptions.CancelledError())
INADA Naoki991adca2017-05-11 21:18:38 +0900255 else:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500256 super().set_result(exc.value)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700257 except exceptions.CancelledError:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700258 super().cancel() # I.e., Future.cancel(self).
259 except Exception as exc:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500260 super().set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700261 except BaseException as exc:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500262 super().set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700263 raise
264 else:
Guido van Rossum1140a032016-09-09 12:54:54 -0700265 blocking = getattr(result, '_asyncio_future_blocking', None)
266 if blocking is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700267 # Yielded Future must come from Future.__iter__().
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500268 if futures._get_loop(result) is not self._loop:
Yury Selivanov6370f342017-12-10 18:36:12 -0500269 new_exc = RuntimeError(
270 f'Task {self!r} got Future '
271 f'{result!r} attached to a different loop')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500272 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500273 self.__step, new_exc, context=self._context)
Guido van Rossum1140a032016-09-09 12:54:54 -0700274 elif blocking:
Yury Selivanov4145c832016-10-09 12:19:12 -0400275 if result is self:
Yury Selivanov6370f342017-12-10 18:36:12 -0500276 new_exc = RuntimeError(
277 f'Task cannot await on itself: {self!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500278 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500279 self.__step, new_exc, context=self._context)
Yury Selivanov4145c832016-10-09 12:19:12 -0400280 else:
281 result._asyncio_future_blocking = False
Yury Selivanovf23746a2018-01-22 19:11:18 -0500282 result.add_done_callback(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500283 self.__wakeup, context=self._context)
Yury Selivanov4145c832016-10-09 12:19:12 -0400284 self._fut_waiter = result
285 if self._must_cancel:
286 if self._fut_waiter.cancel():
287 self._must_cancel = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700288 else:
Yury Selivanov6370f342017-12-10 18:36:12 -0500289 new_exc = RuntimeError(
290 f'yield was used instead of yield from '
291 f'in task {self!r} with {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500292 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500293 self.__step, new_exc, context=self._context)
Yury Selivanov6370f342017-12-10 18:36:12 -0500294
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700295 elif result is None:
296 # Bare yield relinquishes control for one event loop iteration.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500297 self._loop.call_soon(self.__step, context=self._context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700298 elif inspect.isgenerator(result):
299 # Yielding a generator is just wrong.
Yury Selivanov6370f342017-12-10 18:36:12 -0500300 new_exc = RuntimeError(
301 f'yield was used instead of yield from for '
Serhiy Storchaka66553542018-05-20 16:30:31 +0300302 f'generator in task {self!r} with {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500303 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500304 self.__step, new_exc, context=self._context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700305 else:
306 # Yielding something else is an error.
Yury Selivanov6370f342017-12-10 18:36:12 -0500307 new_exc = RuntimeError(f'Task got bad yield: {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500308 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500309 self.__step, new_exc, context=self._context)
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800310 finally:
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200311 _leave_task(self._loop, self)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100312 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700313
Yury Selivanov22feeb82018-01-24 11:31:01 -0500314 def __wakeup(self, future):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700315 try:
Yury Selivanova4afc482015-11-16 15:12:10 -0500316 future.result()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700317 except Exception as exc:
318 # This may also be a cancellation.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500319 self.__step(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700320 else:
Yury Selivanova4afc482015-11-16 15:12:10 -0500321 # Don't pass the value of `future.result()` explicitly,
322 # as `Future.__iter__` and `Future.__await__` don't need it.
323 # If we call `_step(value, None)` instead of `_step()`,
324 # Python eval loop would use `.send(value)` method call,
325 # instead of `__next__()`, which is slower for futures
326 # that return non-generator iterators from their `__iter__`.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500327 self.__step()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700328 self = None # Needed to break cycles when an exception occurs.
329
330
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400331_PyTask = Task
332
333
334try:
335 import _asyncio
336except ImportError:
337 pass
338else:
339 # _CTask is needed for tests.
340 Task = _CTask = _asyncio.Task
341
342
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300343def create_task(coro, *, name=None):
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200344 """Schedule the execution of a coroutine object in a spawn task.
345
346 Return a Task object.
347 """
348 loop = events.get_running_loop()
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300349 task = loop.create_task(coro)
350 _set_task_name(task, name)
351 return task
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200352
353
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700354# wait() and as_completed() similar to those in PEP 3148.
355
356FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
357FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
358ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
359
360
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200361async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700362 """Wait for the Futures and coroutines given by fs to complete.
363
Victor Stinnerdb74d982014-06-10 11:16:05 +0200364 The sequence futures must not be empty.
365
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700366 Coroutines will be wrapped in Tasks.
367
368 Returns two sets of Future: (done, pending).
369
370 Usage:
371
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200372 done, pending = await asyncio.wait(fs)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700373
374 Note: This does not raise TimeoutError! Futures that aren't done
375 when the timeout occurs are returned in the second set.
376 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700377 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500378 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700379 if not fs:
380 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200381 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
Yury Selivanov6370f342017-12-10 18:36:12 -0500382 raise ValueError(f'Invalid return_when value: {return_when}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700383
384 if loop is None:
João Júnior558c49b2018-09-24 06:51:22 -0300385 loop = events.get_running_loop()
386 else:
Yury Selivanovfad6af22018-09-25 17:44:52 -0400387 warnings.warn("The loop argument is deprecated and scheduled for "
388 "removal in Python 3.10.",
João Júnior558c49b2018-09-24 06:51:22 -0300389 DeprecationWarning, stacklevel=2)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700390
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400391 fs = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700392
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200393 return await _wait(fs, timeout, return_when, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700394
395
Victor Stinner59e08022014-08-28 11:19:25 +0200396def _release_waiter(waiter, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700397 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200398 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700399
400
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200401async def wait_for(fut, timeout, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700402 """Wait for the single Future or coroutine to complete, with timeout.
403
404 Coroutine will be wrapped in Task.
405
Victor Stinner421e49b2014-01-23 17:40:59 +0100406 Returns result of the Future or coroutine. When a timeout occurs,
407 it cancels the task and raises TimeoutError. To avoid the task
408 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700409
Victor Stinner922bc2c2015-01-15 16:29:10 +0100410 If the wait is cancelled, the task is also cancelled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700411
Victor Stinner922bc2c2015-01-15 16:29:10 +0100412 This function is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700413 """
414 if loop is None:
João Júnior558c49b2018-09-24 06:51:22 -0300415 loop = events.get_running_loop()
416 else:
Yury Selivanovfad6af22018-09-25 17:44:52 -0400417 warnings.warn("The loop argument is deprecated and scheduled for "
418 "removal in Python 3.10.",
João Júnior558c49b2018-09-24 06:51:22 -0300419 DeprecationWarning, stacklevel=2)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700420
Guido van Rossum48c66c32014-01-29 14:30:38 -0800421 if timeout is None:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200422 return await fut
Guido van Rossum48c66c32014-01-29 14:30:38 -0800423
Victor K4d071892017-10-05 19:04:39 +0300424 if timeout <= 0:
425 fut = ensure_future(fut, loop=loop)
426
427 if fut.done():
428 return fut.result()
429
430 fut.cancel()
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700431 raise exceptions.TimeoutError()
Victor K4d071892017-10-05 19:04:39 +0300432
Yury Selivanov7661db62016-05-16 15:38:39 -0400433 waiter = loop.create_future()
Victor Stinner59e08022014-08-28 11:19:25 +0200434 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
435 cb = functools.partial(_release_waiter, waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700436
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400437 fut = ensure_future(fut, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700438 fut.add_done_callback(cb)
439
440 try:
Victor Stinner59e08022014-08-28 11:19:25 +0200441 # wait until the future completes or the timeout
Victor Stinner922bc2c2015-01-15 16:29:10 +0100442 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200443 await waiter
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700444 except exceptions.CancelledError:
Victor Stinner922bc2c2015-01-15 16:29:10 +0100445 fut.remove_done_callback(cb)
446 fut.cancel()
447 raise
Victor Stinner59e08022014-08-28 11:19:25 +0200448
449 if fut.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700450 return fut.result()
451 else:
452 fut.remove_done_callback(cb)
Elvis Pranskevichuse2b340a2018-05-29 17:31:01 -0400453 # We must ensure that the task is not running
454 # after wait_for() returns.
455 # See https://bugs.python.org/issue32751
456 await _cancel_and_wait(fut, loop=loop)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700457 raise exceptions.TimeoutError()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700458 finally:
459 timeout_handle.cancel()
460
461
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200462async def _wait(fs, timeout, return_when, loop):
Elvis Pranskevichuse2b340a2018-05-29 17:31:01 -0400463 """Internal helper for wait().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700464
465 The fs argument must be a collection of Futures.
466 """
467 assert fs, 'Set of Futures is empty.'
Yury Selivanov7661db62016-05-16 15:38:39 -0400468 waiter = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700469 timeout_handle = None
470 if timeout is not None:
471 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
472 counter = len(fs)
473
474 def _on_completion(f):
475 nonlocal counter
476 counter -= 1
477 if (counter <= 0 or
478 return_when == FIRST_COMPLETED or
479 return_when == FIRST_EXCEPTION and (not f.cancelled() and
480 f.exception() is not None)):
481 if timeout_handle is not None:
482 timeout_handle.cancel()
483 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200484 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700485
486 for f in fs:
487 f.add_done_callback(_on_completion)
488
489 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200490 await waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700491 finally:
492 if timeout_handle is not None:
493 timeout_handle.cancel()
494
495 done, pending = set(), set()
496 for f in fs:
497 f.remove_done_callback(_on_completion)
498 if f.done():
499 done.add(f)
500 else:
501 pending.add(f)
502 return done, pending
503
504
Elvis Pranskevichuse2b340a2018-05-29 17:31:01 -0400505async def _cancel_and_wait(fut, loop):
506 """Cancel the *fut* future or task and wait until it completes."""
507
508 waiter = loop.create_future()
509 cb = functools.partial(_release_waiter, waiter)
510 fut.add_done_callback(cb)
511
512 try:
513 fut.cancel()
514 # We cannot wait on *fut* directly to make
515 # sure _cancel_and_wait itself is reliably cancellable.
516 await waiter
517 finally:
518 fut.remove_done_callback(cb)
519
520
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700521# This is *not* a @coroutine! It is just an iterator (yielding Futures).
522def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800523 """Return an iterator whose values are coroutines.
524
525 When waiting for the yielded coroutines you'll get the results (or
526 exceptions!) of the original Futures (or coroutines), in the order
527 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700528
529 This differs from PEP 3148; the proper way to use this is:
530
531 for f in as_completed(fs):
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200532 result = await f # The 'await' may raise.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700533 # Use result.
534
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200535 If a timeout is specified, the 'await' will raise
Guido van Rossumb58f0532014-02-12 17:58:19 -0800536 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700537
538 Note: The futures 'f' are not necessarily members of fs.
539 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700540 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500541 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700542 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400543 todo = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800544 from .queues import Queue # Import here to avoid circular import problem.
545 done = Queue(loop=loop)
546 timeout_handle = None
547
548 def _on_timeout():
549 for f in todo:
550 f.remove_done_callback(_on_completion)
551 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
552 todo.clear() # Can't do todo.remove(f) in the loop.
553
554 def _on_completion(f):
555 if not todo:
556 return # _on_timeout() was here first.
557 todo.remove(f)
558 done.put_nowait(f)
559 if not todo and timeout_handle is not None:
560 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700561
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200562 async def _wait_for_one():
563 f = await done.get()
Guido van Rossumb58f0532014-02-12 17:58:19 -0800564 if f is None:
565 # Dummy value from _on_timeout().
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700566 raise exceptions.TimeoutError
Guido van Rossumb58f0532014-02-12 17:58:19 -0800567 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700568
Guido van Rossumb58f0532014-02-12 17:58:19 -0800569 for f in todo:
570 f.add_done_callback(_on_completion)
571 if todo and timeout is not None:
572 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700573 for _ in range(len(todo)):
574 yield _wait_for_one()
575
576
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200577@types.coroutine
578def __sleep0():
579 """Skip one event loop run cycle.
580
581 This is a private helper for 'asyncio.sleep()', used
582 when the 'delay' is set to 0. It uses a bare 'yield'
Yury Selivanov22feeb82018-01-24 11:31:01 -0500583 expression (which Task.__step knows how to handle)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200584 instead of creating a Future object.
585 """
586 yield
587
588
589async def sleep(delay, result=None, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700590 """Coroutine that completes after a given time (in seconds)."""
Andrew Svetlov5382c052017-12-17 16:41:30 +0200591 if delay <= 0:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200592 await __sleep0()
Yury Selivanovade04122015-11-05 14:29:04 -0500593 return result
594
Yury Selivanov7661db62016-05-16 15:38:39 -0400595 if loop is None:
João Júnior558c49b2018-09-24 06:51:22 -0300596 loop = events.get_running_loop()
597 else:
Yury Selivanovfad6af22018-09-25 17:44:52 -0400598 warnings.warn("The loop argument is deprecated and scheduled for "
599 "removal in Python 3.10.",
João Júnior558c49b2018-09-24 06:51:22 -0300600 DeprecationWarning, stacklevel=2)
601
Yury Selivanov7661db62016-05-16 15:38:39 -0400602 future = loop.create_future()
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500603 h = loop.call_later(delay,
604 futures._set_result_unless_cancelled,
605 future, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700606 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200607 return await future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700608 finally:
609 h.cancel()
610
611
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400612def ensure_future(coro_or_future, *, loop=None):
Yury Selivanov620279b2015-10-02 15:00:19 -0400613 """Wrap a coroutine or an awaitable in a future.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400614
615 If the argument is a Future, it is returned directly.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700616 """
jimmylaie549c4b2018-05-28 06:42:05 -1000617 if coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200618 if loop is None:
619 loop = events.get_event_loop()
620 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200621 if task._source_traceback:
622 del task._source_traceback[-1]
623 return task
jimmylaie549c4b2018-05-28 06:42:05 -1000624 elif futures.isfuture(coro_or_future):
625 if loop is not None and loop is not futures._get_loop(coro_or_future):
626 raise ValueError('loop argument must agree with Future')
627 return coro_or_future
Victor Stinner3f438a92017-11-28 14:43:52 +0100628 elif inspect.isawaitable(coro_or_future):
Yury Selivanov620279b2015-10-02 15:00:19 -0400629 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700630 else:
Charles Renwickae5b3262017-04-21 16:49:48 -0400631 raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
632 'required')
Yury Selivanov620279b2015-10-02 15:00:19 -0400633
634
635@coroutine
636def _wrap_awaitable(awaitable):
637 """Helper for asyncio.ensure_future().
638
639 Wraps awaitable (an object with __await__) into a coroutine
640 that will later be wrapped in a Task by ensure_future().
641 """
642 return (yield from awaitable.__await__())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700643
644
645class _GatheringFuture(futures.Future):
646 """Helper for gather().
647
648 This overrides cancel() to cancel all the children and act more
649 like Task.cancel(), which doesn't immediately mark itself as
650 cancelled.
651 """
652
653 def __init__(self, children, *, loop=None):
654 super().__init__(loop=loop)
655 self._children = children
Yury Selivanov863b6742018-05-29 17:20:02 -0400656 self._cancel_requested = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700657
658 def cancel(self):
659 if self.done():
660 return False
Yury Selivanov3d676152016-10-21 17:22:17 -0400661 ret = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700662 for child in self._children:
Yury Selivanov3d676152016-10-21 17:22:17 -0400663 if child.cancel():
664 ret = True
Yury Selivanov863b6742018-05-29 17:20:02 -0400665 if ret:
666 # If any child tasks were actually cancelled, we should
667 # propagate the cancellation request regardless of
668 # *return_exceptions* argument. See issue 32684.
669 self._cancel_requested = True
Yury Selivanov3d676152016-10-21 17:22:17 -0400670 return ret
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700671
672
673def gather(*coros_or_futures, loop=None, return_exceptions=False):
Yury Selivanov36c2c042017-12-19 07:19:53 -0500674 """Return a future aggregating results from the given coroutines/futures.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700675
Guido van Rossume3c65a72016-09-30 08:17:15 -0700676 Coroutines will be wrapped in a future and scheduled in the event
677 loop. They will not necessarily be scheduled in the same order as
678 passed in.
679
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700680 All futures must share the same event loop. If all the tasks are
681 done successfully, the returned future's result is the list of
682 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500683 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700684 exceptions in the tasks are treated the same as successful
685 results, and gathered in the result list; otherwise, the first
686 raised exception will be immediately propagated to the returned
687 future.
688
689 Cancellation: if the outer Future is cancelled, all children (that
690 have not completed yet) are also cancelled. If any child is
691 cancelled, this is treated as if it raised CancelledError --
692 the outer Future is *not* cancelled in this case. (This is to
693 prevent the cancellation of one child to cause other children to
694 be cancelled.)
695 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200696 if not coros_or_futures:
Yury Selivanov7661db62016-05-16 15:38:39 -0400697 if loop is None:
698 loop = events.get_event_loop()
699 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700700 outer.set_result([])
701 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200702
Yury Selivanov36c2c042017-12-19 07:19:53 -0500703 def _done_callback(fut):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700704 nonlocal nfinished
Yury Selivanov36c2c042017-12-19 07:19:53 -0500705 nfinished += 1
706
Victor Stinner3531d902015-01-09 01:42:52 +0100707 if outer.done():
708 if not fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700709 # Mark exception retrieved.
710 fut.exception()
711 return
Victor Stinner3531d902015-01-09 01:42:52 +0100712
Yury Selivanov36c2c042017-12-19 07:19:53 -0500713 if not return_exceptions:
714 if fut.cancelled():
715 # Check if 'fut' is cancelled first, as
716 # 'fut.exception()' will *raise* a CancelledError
717 # instead of returning it.
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700718 exc = exceptions.CancelledError()
Yury Selivanov36c2c042017-12-19 07:19:53 -0500719 outer.set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700720 return
Yury Selivanov36c2c042017-12-19 07:19:53 -0500721 else:
722 exc = fut.exception()
723 if exc is not None:
724 outer.set_exception(exc)
725 return
726
727 if nfinished == nfuts:
728 # All futures are done; create a list of results
729 # and set it to the 'outer' future.
730 results = []
731
732 for fut in children:
733 if fut.cancelled():
734 # Check if 'fut' is cancelled first, as
735 # 'fut.exception()' will *raise* a CancelledError
736 # instead of returning it.
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700737 res = exceptions.CancelledError()
Yury Selivanov36c2c042017-12-19 07:19:53 -0500738 else:
739 res = fut.exception()
740 if res is None:
741 res = fut.result()
742 results.append(res)
743
Yury Selivanov863b6742018-05-29 17:20:02 -0400744 if outer._cancel_requested:
745 # If gather is being cancelled we must propagate the
746 # cancellation regardless of *return_exceptions* argument.
747 # See issue 32684.
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700748 outer.set_exception(exceptions.CancelledError())
Yury Selivanov863b6742018-05-29 17:20:02 -0400749 else:
750 outer.set_result(results)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700751
Yury Selivanov36c2c042017-12-19 07:19:53 -0500752 arg_to_fut = {}
753 children = []
754 nfuts = 0
755 nfinished = 0
756 for arg in coros_or_futures:
757 if arg not in arg_to_fut:
758 fut = ensure_future(arg, loop=loop)
759 if loop is None:
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500760 loop = futures._get_loop(fut)
Yury Selivanov36c2c042017-12-19 07:19:53 -0500761 if fut is not arg:
762 # 'arg' was not a Future, therefore, 'fut' is a new
763 # Future created specifically for 'arg'. Since the caller
764 # can't control it, disable the "destroy pending task"
765 # warning.
766 fut._log_destroy_pending = False
767
768 nfuts += 1
769 arg_to_fut[arg] = fut
770 fut.add_done_callback(_done_callback)
771
772 else:
773 # There's a duplicate Future object in coros_or_futures.
774 fut = arg_to_fut[arg]
775
776 children.append(fut)
777
778 outer = _GatheringFuture(children, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700779 return outer
780
781
782def shield(arg, *, loop=None):
783 """Wait for a future, shielding it from cancellation.
784
785 The statement
786
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200787 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700788
789 is exactly equivalent to the statement
790
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200791 res = await something()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700792
793 *except* that if the coroutine containing it is cancelled, the
794 task running in something() is not cancelled. From the POV of
795 something(), the cancellation did not happen. But its caller is
796 still cancelled, so the yield-from expression still raises
797 CancelledError. Note: If something() is cancelled by other means
798 this will still cancel shield().
799
800 If you want to completely ignore cancellation (not recommended)
801 you can combine shield() with a try/except clause, as follows:
802
803 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200804 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700805 except CancelledError:
806 res = None
807 """
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400808 inner = ensure_future(arg, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700809 if inner.done():
810 # Shortcut.
811 return inner
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500812 loop = futures._get_loop(inner)
Yury Selivanov7661db62016-05-16 15:38:39 -0400813 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700814
815 def _done_callback(inner):
816 if outer.cancelled():
Victor Stinner3531d902015-01-09 01:42:52 +0100817 if not inner.cancelled():
818 # Mark inner's result as retrieved.
819 inner.exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700820 return
Victor Stinner3531d902015-01-09 01:42:52 +0100821
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700822 if inner.cancelled():
823 outer.cancel()
824 else:
825 exc = inner.exception()
826 if exc is not None:
827 outer.set_exception(exc)
828 else:
829 outer.set_result(inner.result())
830
831 inner.add_done_callback(_done_callback)
832 return outer
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700833
834
835def run_coroutine_threadsafe(coro, loop):
836 """Submit a coroutine object to a given event loop.
837
838 Return a concurrent.futures.Future to access the result.
839 """
840 if not coroutines.iscoroutine(coro):
841 raise TypeError('A coroutine object is required')
842 future = concurrent.futures.Future()
843
844 def callback():
Guido van Rossum601953b2015-10-05 16:20:00 -0700845 try:
846 futures._chain_future(ensure_future(coro, loop=loop), future)
847 except Exception as exc:
848 if future.set_running_or_notify_cancel():
849 future.set_exception(exc)
850 raise
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700851
852 loop.call_soon_threadsafe(callback)
853 return future
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200854
855
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500856# WeakSet containing all alive tasks.
857_all_tasks = weakref.WeakSet()
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200858
859# Dictionary containing tasks that are currently active in
860# all running event loops. {EventLoop: Task}
861_current_tasks = {}
862
863
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500864def _register_task(task):
865 """Register a new task in asyncio as executed by loop."""
866 _all_tasks.add(task)
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200867
868
869def _enter_task(loop, task):
870 current_task = _current_tasks.get(loop)
871 if current_task is not None:
872 raise RuntimeError(f"Cannot enter into task {task!r} while another "
873 f"task {current_task!r} is being executed.")
874 _current_tasks[loop] = task
875
876
877def _leave_task(loop, task):
878 current_task = _current_tasks.get(loop)
879 if current_task is not task:
880 raise RuntimeError(f"Leaving task {task!r} does not match "
881 f"the current task {current_task!r}.")
882 del _current_tasks[loop]
883
884
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500885def _unregister_task(task):
886 """Unregister a task."""
887 _all_tasks.discard(task)
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200888
889
890_py_register_task = _register_task
891_py_unregister_task = _unregister_task
892_py_enter_task = _enter_task
893_py_leave_task = _leave_task
894
895
896try:
897 from _asyncio import (_register_task, _unregister_task,
898 _enter_task, _leave_task,
899 _all_tasks, _current_tasks)
900except ImportError:
901 pass
902else:
903 _c_register_task = _register_task
904 _c_unregister_task = _unregister_task
905 _c_enter_task = _enter_task
906 _c_leave_task = _leave_task