blob: f5de1a2eea99f4007739226720086693eff08f5c [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Yury Selivanov6370f342017-12-10 18:36:12 -05003__all__ = (
Andrew Svetlovf74ef452017-12-15 07:04:38 +02004 'Task', 'create_task',
Yury Selivanov6370f342017-12-10 18:36:12 -05005 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
Yury Selivanov9edad3c2017-12-11 10:03:48 -05006 'wait', 'wait_for', 'as_completed', 'sleep',
Yury Selivanov6370f342017-12-10 18:36:12 -05007 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
Andrew Svetlov44d1a592017-12-16 21:58:38 +02008 'current_task', 'all_tasks',
9 '_register_task', '_unregister_task', '_enter_task', '_leave_task',
Yury Selivanov6370f342017-12-10 18:36:12 -050010)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012import concurrent.futures
Yury Selivanovf23746a2018-01-22 19:11:18 -050013import contextvars
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014import functools
15import inspect
Alex Grönholmcca4eec2018-08-09 00:06:47 +030016import itertools
Andrew Svetlov5f841b52017-12-09 00:23:48 +020017import types
Yury Selivanov59eb9a42015-05-11 14:48:38 -040018import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019import weakref
20
Yury Selivanova0c1ba62016-10-28 12:52:37 -040021from . import base_tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020022from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070024from . import exceptions
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025from . import futures
Andrew Svetlov68b34a72019-05-16 17:52:10 +030026from .coroutines import _is_coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027
Alex Grönholmcca4eec2018-08-09 00:06:47 +030028# Helper to generate new task names
29# This uses itertools.count() instead of a "+= 1" operation because the latter
30# is not thread safe. See bpo-11866 for a longer explanation.
31_task_name_counter = itertools.count(1).__next__
32
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070033
Andrew Svetlov44d1a592017-12-16 21:58:38 +020034def current_task(loop=None):
35 """Return a currently executed task."""
36 if loop is None:
37 loop = events.get_running_loop()
38 return _current_tasks.get(loop)
39
40
41def all_tasks(loop=None):
42 """Return a set of all tasks for the loop."""
43 if loop is None:
Yury Selivanov416c1eb2018-05-28 17:54:02 -040044 loop = events.get_running_loop()
Andrew Svetlov65aa64f2019-06-11 18:27:30 +030045 # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
46 # thread while we do so. Therefore we cast it to list prior to filtering. The list
47 # cast itself requires iteration, so we repeat it several times ignoring
48 # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
49 # details.
50 i = 0
51 while True:
52 try:
53 tasks = list(_all_tasks)
54 except RuntimeError:
55 i += 1
56 if i >= 1000:
57 raise
58 else:
59 break
60 return {t for t in tasks
Yury Selivanov416c1eb2018-05-28 17:54:02 -040061 if futures._get_loop(t) is loop and not t.done()}
62
63
64def _all_tasks_compat(loop=None):
65 # Different from "all_task()" by returning *all* Tasks, including
66 # the completed ones. Used to implement deprecated "Tasks.all_task()"
67 # method.
68 if loop is None:
Andrew Svetlov44d1a592017-12-16 21:58:38 +020069 loop = events.get_event_loop()
Andrew Svetlov65aa64f2019-06-11 18:27:30 +030070 # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
71 # thread while we do so. Therefore we cast it to list prior to filtering. The list
72 # cast itself requires iteration, so we repeat it several times ignoring
73 # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
74 # details.
75 i = 0
76 while True:
77 try:
78 tasks = list(_all_tasks)
79 except RuntimeError:
80 i += 1
81 if i >= 1000:
82 raise
83 else:
84 break
85 return {t for t in tasks if futures._get_loop(t) is loop}
Andrew Svetlov44d1a592017-12-16 21:58:38 +020086
87
Alex Grönholmcca4eec2018-08-09 00:06:47 +030088def _set_task_name(task, name):
89 if name is not None:
90 try:
91 set_name = task.set_name
92 except AttributeError:
93 pass
94 else:
95 set_name(name)
96
97
Yury Selivanov0cf16f92017-12-25 10:48:15 -050098class Task(futures._PyFuture): # Inherit Python Task implementation
99 # from a Python Future implementation.
100
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700101 """A coroutine wrapped in a Future."""
102
103 # An important invariant maintained while a Task not done:
104 #
105 # - Either _fut_waiter is None, and _step() is scheduled;
106 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
107 #
108 # The only transition from the latter to the former is through
109 # _wakeup(). When _fut_waiter is not None, one of its callbacks
110 # must be _wakeup().
111
Victor Stinnerfe22e092014-12-04 23:00:13 +0100112 # If False, don't log a message if the task is destroyed whereas its
113 # status is still pending
114 _log_destroy_pending = True
115
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800116 @classmethod
117 def current_task(cls, loop=None):
118 """Return the currently running task in an event loop or None.
119
120 By default the current task for the current event loop is returned.
121
122 None is returned when called not in the context of a Task.
123 """
Matthias Bussonnierd0ebf132019-05-20 23:20:10 -0700124 warnings.warn("Task.current_task() is deprecated since Python 3.7, "
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200125 "use asyncio.current_task() instead",
Inada Naokic5c6cda2019-03-22 20:07:32 +0900126 DeprecationWarning,
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200127 stacklevel=2)
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800128 if loop is None:
129 loop = events.get_event_loop()
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200130 return current_task(loop)
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800131
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700132 @classmethod
133 def all_tasks(cls, loop=None):
134 """Return a set of all tasks for an event loop.
135
136 By default all tasks for the current event loop are returned.
137 """
Matthias Bussonnierd0ebf132019-05-20 23:20:10 -0700138 warnings.warn("Task.all_tasks() is deprecated since Python 3.7, "
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200139 "use asyncio.all_tasks() instead",
Inada Naokic5c6cda2019-03-22 20:07:32 +0900140 DeprecationWarning,
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200141 stacklevel=2)
Yury Selivanov416c1eb2018-05-28 17:54:02 -0400142 return _all_tasks_compat(loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700143
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300144 def __init__(self, coro, *, loop=None, name=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700145 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200146 if self._source_traceback:
147 del self._source_traceback[-1]
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200148 if not coroutines.iscoroutine(coro):
149 # raise after Future.__init__(), attrs are required for __del__
150 # prevent logging for pending task in __del__
151 self._log_destroy_pending = False
152 raise TypeError(f"a coroutine was expected, got {coro!r}")
153
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300154 if name is None:
155 self._name = f'Task-{_task_name_counter()}'
156 else:
157 self._name = str(name)
158
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700159 self._must_cancel = False
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200160 self._fut_waiter = None
161 self._coro = coro
Yury Selivanovf23746a2018-01-22 19:11:18 -0500162 self._context = contextvars.copy_context()
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200163
Yury Selivanov22feeb82018-01-24 11:31:01 -0500164 self._loop.call_soon(self.__step, context=self._context)
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500165 _register_task(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700166
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900167 def __del__(self):
168 if self._state == futures._PENDING and self._log_destroy_pending:
169 context = {
170 'task': self,
171 'message': 'Task was destroyed but it is pending!',
172 }
173 if self._source_traceback:
174 context['source_traceback'] = self._source_traceback
175 self._loop.call_exception_handler(context)
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500176 super().__del__()
Victor Stinnera02f81f2014-06-24 22:37:53 +0200177
Batuhan Taşkayadec36722019-12-07 14:05:07 +0300178 def __class_getitem__(cls, type):
179 return cls
180
Victor Stinner313a9802014-07-29 12:58:23 +0200181 def _repr_info(self):
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400182 return base_tasks._task_repr_info(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700183
Alex Grönholm98ef9202019-05-30 18:30:09 +0300184 def get_coro(self):
185 return self._coro
186
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300187 def get_name(self):
188 return self._name
189
190 def set_name(self, value):
191 self._name = str(value)
192
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500193 def set_result(self, result):
194 raise RuntimeError('Task does not support set_result operation')
195
196 def set_exception(self, exception):
197 raise RuntimeError('Task does not support set_exception operation')
198
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700199 def get_stack(self, *, limit=None):
200 """Return the list of stack frames for this task's coroutine.
201
Victor Stinnerd87de832014-12-02 17:57:04 +0100202 If the coroutine is not done, this returns the stack where it is
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700203 suspended. If the coroutine has completed successfully or was
204 cancelled, this returns an empty list. If the coroutine was
205 terminated by an exception, this returns the list of traceback
206 frames.
207
208 The frames are always ordered from oldest to newest.
209
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500210 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700211 return; by default all available frames are returned. Its
212 meaning differs depending on whether a stack or a traceback is
213 returned: the newest frames of a stack are returned, but the
214 oldest frames of a traceback are returned. (This matches the
215 behavior of the traceback module.)
216
217 For reasons beyond our control, only one stack frame is
218 returned for a suspended coroutine.
219 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400220 return base_tasks._task_get_stack(self, limit)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700221
222 def print_stack(self, *, limit=None, file=None):
223 """Print the stack or traceback for this task's coroutine.
224
225 This produces output similar to that of the traceback module,
226 for the frames retrieved by get_stack(). The limit argument
227 is passed to get_stack(). The file argument is an I/O stream
R David Murray8e069d52014-09-24 13:13:45 -0400228 to which the output is written; by default output is written
229 to sys.stderr.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700230 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400231 return base_tasks._task_print_stack(self, limit, file)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700232
233 def cancel(self):
R David Murray8e069d52014-09-24 13:13:45 -0400234 """Request that this task cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200235
Victor Stinner8d213572014-06-02 23:06:46 +0200236 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200237 wrapped coroutine on the next cycle through the event loop.
238 The coroutine then has a chance to clean up or even deny
239 the request using try/except/finally.
240
R David Murray8e069d52014-09-24 13:13:45 -0400241 Unlike Future.cancel, this does not guarantee that the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200242 task will be cancelled: the exception might be caught and
R David Murray8e069d52014-09-24 13:13:45 -0400243 acted upon, delaying cancellation of the task or preventing
244 cancellation completely. The task may also return a value or
245 raise a different exception.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200246
247 Immediately after this method is called, Task.cancelled() will
248 not return True (unless the task was already cancelled). A
249 task will be marked as cancelled when the wrapped coroutine
250 terminates with a CancelledError exception (even if cancel()
251 was not called).
252 """
Yury Selivanov7ce1c6f2017-06-11 13:49:18 +0000253 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700254 if self.done():
255 return False
256 if self._fut_waiter is not None:
257 if self._fut_waiter.cancel():
258 # Leave self._fut_waiter; it may be a Task that
259 # catches and ignores the cancellation so we may have
260 # to cancel it again later.
261 return True
Yury Selivanov22feeb82018-01-24 11:31:01 -0500262 # It must be the case that self.__step is already scheduled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700263 self._must_cancel = True
264 return True
265
Yury Selivanov22feeb82018-01-24 11:31:01 -0500266 def __step(self, exc=None):
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500267 if self.done():
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700268 raise exceptions.InvalidStateError(
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500269 f'_step(): already done: {self!r}, {exc!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700270 if self._must_cancel:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700271 if not isinstance(exc, exceptions.CancelledError):
272 exc = exceptions.CancelledError()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700273 self._must_cancel = False
274 coro = self._coro
275 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800276
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200277 _enter_task(self._loop, self)
Yury Selivanovd59bba82015-11-20 12:41:03 -0500278 # Call either coro.throw(exc) or coro.send(None).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700279 try:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500280 if exc is None:
281 # We use the `send` method directly, because coroutines
282 # don't have `__iter__` and `__next__` methods.
283 result = coro.send(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700284 else:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500285 result = coro.throw(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700286 except StopIteration as exc:
INADA Naoki991adca2017-05-11 21:18:38 +0900287 if self._must_cancel:
288 # Task is cancelled right before coro stops.
289 self._must_cancel = False
Yury Selivanovedad4d82019-09-25 03:32:08 -0700290 super().cancel()
INADA Naoki991adca2017-05-11 21:18:38 +0900291 else:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500292 super().set_result(exc.value)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700293 except exceptions.CancelledError:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700294 super().cancel() # I.e., Future.cancel(self).
Yury Selivanov431b5402019-05-27 14:45:12 +0200295 except (KeyboardInterrupt, SystemExit) as exc:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500296 super().set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700297 raise
Yury Selivanov431b5402019-05-27 14:45:12 +0200298 except BaseException as exc:
299 super().set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700300 else:
Guido van Rossum1140a032016-09-09 12:54:54 -0700301 blocking = getattr(result, '_asyncio_future_blocking', None)
302 if blocking is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700303 # Yielded Future must come from Future.__iter__().
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500304 if futures._get_loop(result) is not self._loop:
Yury Selivanov6370f342017-12-10 18:36:12 -0500305 new_exc = RuntimeError(
306 f'Task {self!r} got Future '
307 f'{result!r} attached to a different loop')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500308 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500309 self.__step, new_exc, context=self._context)
Guido van Rossum1140a032016-09-09 12:54:54 -0700310 elif blocking:
Yury Selivanov4145c832016-10-09 12:19:12 -0400311 if result is self:
Yury Selivanov6370f342017-12-10 18:36:12 -0500312 new_exc = RuntimeError(
313 f'Task cannot await on itself: {self!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500314 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500315 self.__step, new_exc, context=self._context)
Yury Selivanov4145c832016-10-09 12:19:12 -0400316 else:
317 result._asyncio_future_blocking = False
Yury Selivanovf23746a2018-01-22 19:11:18 -0500318 result.add_done_callback(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500319 self.__wakeup, context=self._context)
Yury Selivanov4145c832016-10-09 12:19:12 -0400320 self._fut_waiter = result
321 if self._must_cancel:
322 if self._fut_waiter.cancel():
323 self._must_cancel = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700324 else:
Yury Selivanov6370f342017-12-10 18:36:12 -0500325 new_exc = RuntimeError(
326 f'yield was used instead of yield from '
327 f'in task {self!r} with {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500328 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500329 self.__step, new_exc, context=self._context)
Yury Selivanov6370f342017-12-10 18:36:12 -0500330
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700331 elif result is None:
332 # Bare yield relinquishes control for one event loop iteration.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500333 self._loop.call_soon(self.__step, context=self._context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700334 elif inspect.isgenerator(result):
335 # Yielding a generator is just wrong.
Yury Selivanov6370f342017-12-10 18:36:12 -0500336 new_exc = RuntimeError(
337 f'yield was used instead of yield from for '
Serhiy Storchaka66553542018-05-20 16:30:31 +0300338 f'generator in task {self!r} with {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500339 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500340 self.__step, new_exc, context=self._context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700341 else:
342 # Yielding something else is an error.
Yury Selivanov6370f342017-12-10 18:36:12 -0500343 new_exc = RuntimeError(f'Task got bad yield: {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500344 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500345 self.__step, new_exc, context=self._context)
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800346 finally:
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200347 _leave_task(self._loop, self)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100348 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700349
Yury Selivanov22feeb82018-01-24 11:31:01 -0500350 def __wakeup(self, future):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700351 try:
Yury Selivanova4afc482015-11-16 15:12:10 -0500352 future.result()
Yury Selivanov431b5402019-05-27 14:45:12 +0200353 except BaseException as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700354 # This may also be a cancellation.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500355 self.__step(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700356 else:
Yury Selivanova4afc482015-11-16 15:12:10 -0500357 # Don't pass the value of `future.result()` explicitly,
358 # as `Future.__iter__` and `Future.__await__` don't need it.
359 # If we call `_step(value, None)` instead of `_step()`,
360 # Python eval loop would use `.send(value)` method call,
361 # instead of `__next__()`, which is slower for futures
362 # that return non-generator iterators from their `__iter__`.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500363 self.__step()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700364 self = None # Needed to break cycles when an exception occurs.
365
366
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400367_PyTask = Task
368
369
370try:
371 import _asyncio
372except ImportError:
373 pass
374else:
375 # _CTask is needed for tests.
376 Task = _CTask = _asyncio.Task
377
378
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300379def create_task(coro, *, name=None):
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200380 """Schedule the execution of a coroutine object in a spawn task.
381
382 Return a Task object.
383 """
384 loop = events.get_running_loop()
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300385 task = loop.create_task(coro)
386 _set_task_name(task, name)
387 return task
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200388
389
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700390# wait() and as_completed() similar to those in PEP 3148.
391
392FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
393FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
394ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
395
396
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200397async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700398 """Wait for the Futures and coroutines given by fs to complete.
399
Victor Stinnerdb74d982014-06-10 11:16:05 +0200400 The sequence futures must not be empty.
401
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700402 Coroutines will be wrapped in Tasks.
403
404 Returns two sets of Future: (done, pending).
405
406 Usage:
407
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200408 done, pending = await asyncio.wait(fs)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700409
410 Note: This does not raise TimeoutError! Futures that aren't done
411 when the timeout occurs are returned in the second set.
412 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700413 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500414 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700415 if not fs:
416 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200417 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
Yury Selivanov6370f342017-12-10 18:36:12 -0500418 raise ValueError(f'Invalid return_when value: {return_when}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700419
420 if loop is None:
João Júnior558c49b2018-09-24 06:51:22 -0300421 loop = events.get_running_loop()
422 else:
Matthias Bussonnierd0ebf132019-05-20 23:20:10 -0700423 warnings.warn("The loop argument is deprecated since Python 3.8, "
424 "and scheduled for removal in Python 3.10.",
João Júnior558c49b2018-09-24 06:51:22 -0300425 DeprecationWarning, stacklevel=2)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700426
Kyle Stanley89aa7f02019-12-30 06:50:19 -0500427 if any(coroutines.iscoroutine(f) for f in set(fs)):
428 warnings.warn("The explicit passing of coroutine objects to "
429 "asyncio.wait() is deprecated since Python 3.8, and "
430 "scheduled for removal in Python 3.11.",
431 DeprecationWarning, stacklevel=2)
432
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400433 fs = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700434
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200435 return await _wait(fs, timeout, return_when, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700436
437
Victor Stinner59e08022014-08-28 11:19:25 +0200438def _release_waiter(waiter, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700439 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200440 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700441
442
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200443async def wait_for(fut, timeout, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700444 """Wait for the single Future or coroutine to complete, with timeout.
445
446 Coroutine will be wrapped in Task.
447
Victor Stinner421e49b2014-01-23 17:40:59 +0100448 Returns result of the Future or coroutine. When a timeout occurs,
449 it cancels the task and raises TimeoutError. To avoid the task
450 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700451
Victor Stinner922bc2c2015-01-15 16:29:10 +0100452 If the wait is cancelled, the task is also cancelled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700453
Victor Stinner922bc2c2015-01-15 16:29:10 +0100454 This function is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700455 """
456 if loop is None:
João Júnior558c49b2018-09-24 06:51:22 -0300457 loop = events.get_running_loop()
458 else:
Matthias Bussonnierd0ebf132019-05-20 23:20:10 -0700459 warnings.warn("The loop argument is deprecated since Python 3.8, "
460 "and scheduled for removal in Python 3.10.",
João Júnior558c49b2018-09-24 06:51:22 -0300461 DeprecationWarning, stacklevel=2)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700462
Guido van Rossum48c66c32014-01-29 14:30:38 -0800463 if timeout is None:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200464 return await fut
Guido van Rossum48c66c32014-01-29 14:30:38 -0800465
Victor K4d071892017-10-05 19:04:39 +0300466 if timeout <= 0:
467 fut = ensure_future(fut, loop=loop)
468
469 if fut.done():
470 return fut.result()
471
472 fut.cancel()
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700473 raise exceptions.TimeoutError()
Victor K4d071892017-10-05 19:04:39 +0300474
Yury Selivanov7661db62016-05-16 15:38:39 -0400475 waiter = loop.create_future()
Victor Stinner59e08022014-08-28 11:19:25 +0200476 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
477 cb = functools.partial(_release_waiter, waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700478
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400479 fut = ensure_future(fut, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700480 fut.add_done_callback(cb)
481
482 try:
Victor Stinner59e08022014-08-28 11:19:25 +0200483 # wait until the future completes or the timeout
Victor Stinner922bc2c2015-01-15 16:29:10 +0100484 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200485 await waiter
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700486 except exceptions.CancelledError:
Victor Stinner922bc2c2015-01-15 16:29:10 +0100487 fut.remove_done_callback(cb)
488 fut.cancel()
489 raise
Victor Stinner59e08022014-08-28 11:19:25 +0200490
491 if fut.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700492 return fut.result()
493 else:
494 fut.remove_done_callback(cb)
Elvis Pranskevichuse2b340a2018-05-29 17:31:01 -0400495 # We must ensure that the task is not running
496 # after wait_for() returns.
497 # See https://bugs.python.org/issue32751
498 await _cancel_and_wait(fut, loop=loop)
romasku382a5632020-05-15 23:12:05 +0300499 # In case task cancellation failed with some
500 # exception, we should re-raise it
501 # See https://bugs.python.org/issue40607
502 try:
503 fut.result()
504 except exceptions.CancelledError as exc:
505 raise exceptions.TimeoutError() from exc
506 else:
507 raise exceptions.TimeoutError()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700508 finally:
509 timeout_handle.cancel()
510
511
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200512async def _wait(fs, timeout, return_when, loop):
Elvis Pranskevichuse2b340a2018-05-29 17:31:01 -0400513 """Internal helper for wait().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700514
515 The fs argument must be a collection of Futures.
516 """
517 assert fs, 'Set of Futures is empty.'
Yury Selivanov7661db62016-05-16 15:38:39 -0400518 waiter = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700519 timeout_handle = None
520 if timeout is not None:
521 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
522 counter = len(fs)
523
524 def _on_completion(f):
525 nonlocal counter
526 counter -= 1
527 if (counter <= 0 or
528 return_when == FIRST_COMPLETED or
529 return_when == FIRST_EXCEPTION and (not f.cancelled() and
530 f.exception() is not None)):
531 if timeout_handle is not None:
532 timeout_handle.cancel()
533 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200534 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700535
536 for f in fs:
537 f.add_done_callback(_on_completion)
538
539 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200540 await waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700541 finally:
542 if timeout_handle is not None:
543 timeout_handle.cancel()
gescheitc1964e92019-05-03 18:18:02 +0300544 for f in fs:
545 f.remove_done_callback(_on_completion)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700546
547 done, pending = set(), set()
548 for f in fs:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700549 if f.done():
550 done.add(f)
551 else:
552 pending.add(f)
553 return done, pending
554
555
Elvis Pranskevichuse2b340a2018-05-29 17:31:01 -0400556async def _cancel_and_wait(fut, loop):
557 """Cancel the *fut* future or task and wait until it completes."""
558
559 waiter = loop.create_future()
560 cb = functools.partial(_release_waiter, waiter)
561 fut.add_done_callback(cb)
562
563 try:
564 fut.cancel()
565 # We cannot wait on *fut* directly to make
566 # sure _cancel_and_wait itself is reliably cancellable.
567 await waiter
568 finally:
569 fut.remove_done_callback(cb)
570
571
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700572# This is *not* a @coroutine! It is just an iterator (yielding Futures).
573def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800574 """Return an iterator whose values are coroutines.
575
576 When waiting for the yielded coroutines you'll get the results (or
577 exceptions!) of the original Futures (or coroutines), in the order
578 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700579
580 This differs from PEP 3148; the proper way to use this is:
581
582 for f in as_completed(fs):
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200583 result = await f # The 'await' may raise.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700584 # Use result.
585
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200586 If a timeout is specified, the 'await' will raise
Guido van Rossumb58f0532014-02-12 17:58:19 -0800587 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700588
589 Note: The futures 'f' are not necessarily members of fs.
590 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700591 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500592 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Andrew Svetlova4888792019-09-12 15:40:40 +0300593
Guido van Rossumb58f0532014-02-12 17:58:19 -0800594 from .queues import Queue # Import here to avoid circular import problem.
595 done = Queue(loop=loop)
Andrew Svetlova4888792019-09-12 15:40:40 +0300596
597 if loop is None:
598 loop = events.get_event_loop()
599 else:
600 warnings.warn("The loop argument is deprecated since Python 3.8, "
601 "and scheduled for removal in Python 3.10.",
602 DeprecationWarning, stacklevel=2)
603 todo = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800604 timeout_handle = None
605
606 def _on_timeout():
607 for f in todo:
608 f.remove_done_callback(_on_completion)
609 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
610 todo.clear() # Can't do todo.remove(f) in the loop.
611
612 def _on_completion(f):
613 if not todo:
614 return # _on_timeout() was here first.
615 todo.remove(f)
616 done.put_nowait(f)
617 if not todo and timeout_handle is not None:
618 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700619
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200620 async def _wait_for_one():
621 f = await done.get()
Guido van Rossumb58f0532014-02-12 17:58:19 -0800622 if f is None:
623 # Dummy value from _on_timeout().
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700624 raise exceptions.TimeoutError
Guido van Rossumb58f0532014-02-12 17:58:19 -0800625 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700626
Guido van Rossumb58f0532014-02-12 17:58:19 -0800627 for f in todo:
628 f.add_done_callback(_on_completion)
629 if todo and timeout is not None:
630 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700631 for _ in range(len(todo)):
632 yield _wait_for_one()
633
634
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200635@types.coroutine
636def __sleep0():
637 """Skip one event loop run cycle.
638
639 This is a private helper for 'asyncio.sleep()', used
640 when the 'delay' is set to 0. It uses a bare 'yield'
Yury Selivanov22feeb82018-01-24 11:31:01 -0500641 expression (which Task.__step knows how to handle)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200642 instead of creating a Future object.
643 """
644 yield
645
646
647async def sleep(delay, result=None, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700648 """Coroutine that completes after a given time (in seconds)."""
Andrew Svetlov5382c052017-12-17 16:41:30 +0200649 if delay <= 0:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200650 await __sleep0()
Yury Selivanovade04122015-11-05 14:29:04 -0500651 return result
652
Yury Selivanov7661db62016-05-16 15:38:39 -0400653 if loop is None:
João Júnior558c49b2018-09-24 06:51:22 -0300654 loop = events.get_running_loop()
655 else:
Matthias Bussonnierd0ebf132019-05-20 23:20:10 -0700656 warnings.warn("The loop argument is deprecated since Python 3.8, "
657 "and scheduled for removal in Python 3.10.",
João Júnior558c49b2018-09-24 06:51:22 -0300658 DeprecationWarning, stacklevel=2)
659
Yury Selivanov7661db62016-05-16 15:38:39 -0400660 future = loop.create_future()
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500661 h = loop.call_later(delay,
662 futures._set_result_unless_cancelled,
663 future, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700664 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200665 return await future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700666 finally:
667 h.cancel()
668
669
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400670def ensure_future(coro_or_future, *, loop=None):
Yury Selivanov620279b2015-10-02 15:00:19 -0400671 """Wrap a coroutine or an awaitable in a future.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400672
673 If the argument is a Future, it is returned directly.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700674 """
jimmylaie549c4b2018-05-28 06:42:05 -1000675 if coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200676 if loop is None:
677 loop = events.get_event_loop()
678 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200679 if task._source_traceback:
680 del task._source_traceback[-1]
681 return task
jimmylaie549c4b2018-05-28 06:42:05 -1000682 elif futures.isfuture(coro_or_future):
683 if loop is not None and loop is not futures._get_loop(coro_or_future):
Zackery Spytz4737b922019-05-03 09:35:26 -0600684 raise ValueError('The future belongs to a different loop than '
685 'the one specified as the loop argument')
jimmylaie549c4b2018-05-28 06:42:05 -1000686 return coro_or_future
Victor Stinner3f438a92017-11-28 14:43:52 +0100687 elif inspect.isawaitable(coro_or_future):
Yury Selivanov620279b2015-10-02 15:00:19 -0400688 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700689 else:
Charles Renwickae5b3262017-04-21 16:49:48 -0400690 raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
691 'required')
Yury Selivanov620279b2015-10-02 15:00:19 -0400692
693
Andrew Svetlov68b34a72019-05-16 17:52:10 +0300694@types.coroutine
Yury Selivanov620279b2015-10-02 15:00:19 -0400695def _wrap_awaitable(awaitable):
696 """Helper for asyncio.ensure_future().
697
698 Wraps awaitable (an object with __await__) into a coroutine
699 that will later be wrapped in a Task by ensure_future().
700 """
701 return (yield from awaitable.__await__())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700702
Andrew Svetlov68b34a72019-05-16 17:52:10 +0300703_wrap_awaitable._is_coroutine = _is_coroutine
704
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700705
706class _GatheringFuture(futures.Future):
707 """Helper for gather().
708
709 This overrides cancel() to cancel all the children and act more
710 like Task.cancel(), which doesn't immediately mark itself as
711 cancelled.
712 """
713
714 def __init__(self, children, *, loop=None):
715 super().__init__(loop=loop)
716 self._children = children
Yury Selivanov863b6742018-05-29 17:20:02 -0400717 self._cancel_requested = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700718
719 def cancel(self):
720 if self.done():
721 return False
Yury Selivanov3d676152016-10-21 17:22:17 -0400722 ret = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700723 for child in self._children:
Yury Selivanov3d676152016-10-21 17:22:17 -0400724 if child.cancel():
725 ret = True
Yury Selivanov863b6742018-05-29 17:20:02 -0400726 if ret:
727 # If any child tasks were actually cancelled, we should
728 # propagate the cancellation request regardless of
729 # *return_exceptions* argument. See issue 32684.
730 self._cancel_requested = True
Yury Selivanov3d676152016-10-21 17:22:17 -0400731 return ret
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700732
733
734def gather(*coros_or_futures, loop=None, return_exceptions=False):
Yury Selivanov36c2c042017-12-19 07:19:53 -0500735 """Return a future aggregating results from the given coroutines/futures.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700736
Guido van Rossume3c65a72016-09-30 08:17:15 -0700737 Coroutines will be wrapped in a future and scheduled in the event
738 loop. They will not necessarily be scheduled in the same order as
739 passed in.
740
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700741 All futures must share the same event loop. If all the tasks are
742 done successfully, the returned future's result is the list of
743 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500744 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700745 exceptions in the tasks are treated the same as successful
746 results, and gathered in the result list; otherwise, the first
747 raised exception will be immediately propagated to the returned
748 future.
749
750 Cancellation: if the outer Future is cancelled, all children (that
751 have not completed yet) are also cancelled. If any child is
752 cancelled, this is treated as if it raised CancelledError --
753 the outer Future is *not* cancelled in this case. (This is to
754 prevent the cancellation of one child to cause other children to
755 be cancelled.)
756 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200757 if not coros_or_futures:
Yury Selivanov7661db62016-05-16 15:38:39 -0400758 if loop is None:
759 loop = events.get_event_loop()
Andrew Svetlova4888792019-09-12 15:40:40 +0300760 else:
761 warnings.warn("The loop argument is deprecated since Python 3.8, "
762 "and scheduled for removal in Python 3.10.",
763 DeprecationWarning, stacklevel=2)
Yury Selivanov7661db62016-05-16 15:38:39 -0400764 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700765 outer.set_result([])
766 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200767
Yury Selivanov36c2c042017-12-19 07:19:53 -0500768 def _done_callback(fut):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700769 nonlocal nfinished
Yury Selivanov36c2c042017-12-19 07:19:53 -0500770 nfinished += 1
771
Victor Stinner3531d902015-01-09 01:42:52 +0100772 if outer.done():
773 if not fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700774 # Mark exception retrieved.
775 fut.exception()
776 return
Victor Stinner3531d902015-01-09 01:42:52 +0100777
Yury Selivanov36c2c042017-12-19 07:19:53 -0500778 if not return_exceptions:
779 if fut.cancelled():
780 # Check if 'fut' is cancelled first, as
781 # 'fut.exception()' will *raise* a CancelledError
782 # instead of returning it.
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700783 exc = exceptions.CancelledError()
Yury Selivanov36c2c042017-12-19 07:19:53 -0500784 outer.set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700785 return
Yury Selivanov36c2c042017-12-19 07:19:53 -0500786 else:
787 exc = fut.exception()
788 if exc is not None:
789 outer.set_exception(exc)
790 return
791
792 if nfinished == nfuts:
793 # All futures are done; create a list of results
794 # and set it to the 'outer' future.
795 results = []
796
797 for fut in children:
798 if fut.cancelled():
799 # Check if 'fut' is cancelled first, as
800 # 'fut.exception()' will *raise* a CancelledError
801 # instead of returning it.
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700802 res = exceptions.CancelledError()
Yury Selivanov36c2c042017-12-19 07:19:53 -0500803 else:
804 res = fut.exception()
805 if res is None:
806 res = fut.result()
807 results.append(res)
808
Yury Selivanov863b6742018-05-29 17:20:02 -0400809 if outer._cancel_requested:
810 # If gather is being cancelled we must propagate the
811 # cancellation regardless of *return_exceptions* argument.
812 # See issue 32684.
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700813 outer.set_exception(exceptions.CancelledError())
Yury Selivanov863b6742018-05-29 17:20:02 -0400814 else:
815 outer.set_result(results)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700816
Yury Selivanov36c2c042017-12-19 07:19:53 -0500817 arg_to_fut = {}
818 children = []
819 nfuts = 0
820 nfinished = 0
821 for arg in coros_or_futures:
822 if arg not in arg_to_fut:
823 fut = ensure_future(arg, loop=loop)
824 if loop is None:
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500825 loop = futures._get_loop(fut)
Yury Selivanov36c2c042017-12-19 07:19:53 -0500826 if fut is not arg:
827 # 'arg' was not a Future, therefore, 'fut' is a new
828 # Future created specifically for 'arg'. Since the caller
829 # can't control it, disable the "destroy pending task"
830 # warning.
831 fut._log_destroy_pending = False
832
833 nfuts += 1
834 arg_to_fut[arg] = fut
835 fut.add_done_callback(_done_callback)
836
837 else:
838 # There's a duplicate Future object in coros_or_futures.
839 fut = arg_to_fut[arg]
840
841 children.append(fut)
842
843 outer = _GatheringFuture(children, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700844 return outer
845
846
847def shield(arg, *, loop=None):
848 """Wait for a future, shielding it from cancellation.
849
850 The statement
851
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200852 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700853
854 is exactly equivalent to the statement
855
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200856 res = await something()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700857
858 *except* that if the coroutine containing it is cancelled, the
859 task running in something() is not cancelled. From the POV of
860 something(), the cancellation did not happen. But its caller is
861 still cancelled, so the yield-from expression still raises
862 CancelledError. Note: If something() is cancelled by other means
863 this will still cancel shield().
864
865 If you want to completely ignore cancellation (not recommended)
866 you can combine shield() with a try/except clause, as follows:
867
868 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200869 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700870 except CancelledError:
871 res = None
872 """
Andrew Svetlova4888792019-09-12 15:40:40 +0300873 if loop is not None:
874 warnings.warn("The loop argument is deprecated since Python 3.8, "
875 "and scheduled for removal in Python 3.10.",
876 DeprecationWarning, stacklevel=2)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400877 inner = ensure_future(arg, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700878 if inner.done():
879 # Shortcut.
880 return inner
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500881 loop = futures._get_loop(inner)
Yury Selivanov7661db62016-05-16 15:38:39 -0400882 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700883
Romain Picardb35acc52019-05-07 20:58:24 +0200884 def _inner_done_callback(inner):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700885 if outer.cancelled():
Victor Stinner3531d902015-01-09 01:42:52 +0100886 if not inner.cancelled():
887 # Mark inner's result as retrieved.
888 inner.exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700889 return
Victor Stinner3531d902015-01-09 01:42:52 +0100890
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700891 if inner.cancelled():
892 outer.cancel()
893 else:
894 exc = inner.exception()
895 if exc is not None:
896 outer.set_exception(exc)
897 else:
898 outer.set_result(inner.result())
899
Romain Picardb35acc52019-05-07 20:58:24 +0200900
901 def _outer_done_callback(outer):
902 if not inner.done():
903 inner.remove_done_callback(_inner_done_callback)
904
905 inner.add_done_callback(_inner_done_callback)
906 outer.add_done_callback(_outer_done_callback)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700907 return outer
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700908
909
910def run_coroutine_threadsafe(coro, loop):
911 """Submit a coroutine object to a given event loop.
912
913 Return a concurrent.futures.Future to access the result.
914 """
915 if not coroutines.iscoroutine(coro):
916 raise TypeError('A coroutine object is required')
917 future = concurrent.futures.Future()
918
919 def callback():
Guido van Rossum601953b2015-10-05 16:20:00 -0700920 try:
921 futures._chain_future(ensure_future(coro, loop=loop), future)
Yury Selivanov431b5402019-05-27 14:45:12 +0200922 except (SystemExit, KeyboardInterrupt):
923 raise
924 except BaseException as exc:
Guido van Rossum601953b2015-10-05 16:20:00 -0700925 if future.set_running_or_notify_cancel():
926 future.set_exception(exc)
927 raise
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700928
929 loop.call_soon_threadsafe(callback)
930 return future
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200931
932
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500933# WeakSet containing all alive tasks.
934_all_tasks = weakref.WeakSet()
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200935
936# Dictionary containing tasks that are currently active in
937# all running event loops. {EventLoop: Task}
938_current_tasks = {}
939
940
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500941def _register_task(task):
942 """Register a new task in asyncio as executed by loop."""
943 _all_tasks.add(task)
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200944
945
946def _enter_task(loop, task):
947 current_task = _current_tasks.get(loop)
948 if current_task is not None:
949 raise RuntimeError(f"Cannot enter into task {task!r} while another "
950 f"task {current_task!r} is being executed.")
951 _current_tasks[loop] = task
952
953
954def _leave_task(loop, task):
955 current_task = _current_tasks.get(loop)
956 if current_task is not task:
957 raise RuntimeError(f"Leaving task {task!r} does not match "
958 f"the current task {current_task!r}.")
959 del _current_tasks[loop]
960
961
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500962def _unregister_task(task):
963 """Unregister a task."""
964 _all_tasks.discard(task)
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200965
966
967_py_register_task = _register_task
968_py_unregister_task = _unregister_task
969_py_enter_task = _enter_task
970_py_leave_task = _leave_task
971
972
973try:
974 from _asyncio import (_register_task, _unregister_task,
975 _enter_task, _leave_task,
976 _all_tasks, _current_tasks)
977except ImportError:
978 pass
979else:
980 _c_register_task = _register_task
981 _c_unregister_task = _unregister_task
982 _c_enter_task = _enter_task
983 _c_leave_task = _leave_task