blob: a3a0a33ee03dacb6cf3d1d7c649c7853514a5f8c [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Yury Selivanov6370f342017-12-10 18:36:12 -05003__all__ = (
Andrew Svetlovf74ef452017-12-15 07:04:38 +02004 'Task', 'create_task',
Yury Selivanov6370f342017-12-10 18:36:12 -05005 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
Yury Selivanov9edad3c2017-12-11 10:03:48 -05006 'wait', 'wait_for', 'as_completed', 'sleep',
Yury Selivanov6370f342017-12-10 18:36:12 -05007 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
Andrew Svetlov44d1a592017-12-16 21:58:38 +02008 'current_task', 'all_tasks',
9 '_register_task', '_unregister_task', '_enter_task', '_leave_task',
Yury Selivanov6370f342017-12-10 18:36:12 -050010)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012import concurrent.futures
Yury Selivanovf23746a2018-01-22 19:11:18 -050013import contextvars
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014import functools
15import inspect
Alex Grönholmcca4eec2018-08-09 00:06:47 +030016import itertools
Andrew Svetlov5f841b52017-12-09 00:23:48 +020017import types
Yury Selivanov59eb9a42015-05-11 14:48:38 -040018import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019import weakref
20
Yury Selivanova0c1ba62016-10-28 12:52:37 -040021from . import base_tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020022from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070024from . import exceptions
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025from . import futures
Andrew Svetlov68b34a72019-05-16 17:52:10 +030026from .coroutines import _is_coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027
Alex Grönholmcca4eec2018-08-09 00:06:47 +030028# Helper to generate new task names
29# This uses itertools.count() instead of a "+= 1" operation because the latter
30# is not thread safe. See bpo-11866 for a longer explanation.
31_task_name_counter = itertools.count(1).__next__
32
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070033
Andrew Svetlov44d1a592017-12-16 21:58:38 +020034def current_task(loop=None):
35 """Return a currently executed task."""
36 if loop is None:
37 loop = events.get_running_loop()
38 return _current_tasks.get(loop)
39
40
41def all_tasks(loop=None):
42 """Return a set of all tasks for the loop."""
43 if loop is None:
Yury Selivanov416c1eb2018-05-28 17:54:02 -040044 loop = events.get_running_loop()
Andrew Svetlov65aa64f2019-06-11 18:27:30 +030045 # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
46 # thread while we do so. Therefore we cast it to list prior to filtering. The list
47 # cast itself requires iteration, so we repeat it several times ignoring
48 # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
49 # details.
50 i = 0
51 while True:
52 try:
53 tasks = list(_all_tasks)
54 except RuntimeError:
55 i += 1
56 if i >= 1000:
57 raise
58 else:
59 break
60 return {t for t in tasks
Yury Selivanov416c1eb2018-05-28 17:54:02 -040061 if futures._get_loop(t) is loop and not t.done()}
62
63
64def _all_tasks_compat(loop=None):
65 # Different from "all_task()" by returning *all* Tasks, including
66 # the completed ones. Used to implement deprecated "Tasks.all_task()"
67 # method.
68 if loop is None:
Andrew Svetlov44d1a592017-12-16 21:58:38 +020069 loop = events.get_event_loop()
Andrew Svetlov65aa64f2019-06-11 18:27:30 +030070 # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
71 # thread while we do so. Therefore we cast it to list prior to filtering. The list
72 # cast itself requires iteration, so we repeat it several times ignoring
73 # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
74 # details.
75 i = 0
76 while True:
77 try:
78 tasks = list(_all_tasks)
79 except RuntimeError:
80 i += 1
81 if i >= 1000:
82 raise
83 else:
84 break
85 return {t for t in tasks if futures._get_loop(t) is loop}
Andrew Svetlov44d1a592017-12-16 21:58:38 +020086
87
Alex Grönholmcca4eec2018-08-09 00:06:47 +030088def _set_task_name(task, name):
89 if name is not None:
90 try:
91 set_name = task.set_name
92 except AttributeError:
93 pass
94 else:
95 set_name(name)
96
97
Yury Selivanov0cf16f92017-12-25 10:48:15 -050098class Task(futures._PyFuture): # Inherit Python Task implementation
99 # from a Python Future implementation.
100
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700101 """A coroutine wrapped in a Future."""
102
103 # An important invariant maintained while a Task not done:
104 #
105 # - Either _fut_waiter is None, and _step() is scheduled;
106 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
107 #
108 # The only transition from the latter to the former is through
109 # _wakeup(). When _fut_waiter is not None, one of its callbacks
110 # must be _wakeup().
111
Victor Stinnerfe22e092014-12-04 23:00:13 +0100112 # If False, don't log a message if the task is destroyed whereas its
113 # status is still pending
114 _log_destroy_pending = True
115
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800116 @classmethod
117 def current_task(cls, loop=None):
118 """Return the currently running task in an event loop or None.
119
120 By default the current task for the current event loop is returned.
121
122 None is returned when called not in the context of a Task.
123 """
Matthias Bussonnierd0ebf132019-05-20 23:20:10 -0700124 warnings.warn("Task.current_task() is deprecated since Python 3.7, "
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200125 "use asyncio.current_task() instead",
Inada Naokic5c6cda2019-03-22 20:07:32 +0900126 DeprecationWarning,
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200127 stacklevel=2)
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800128 if loop is None:
129 loop = events.get_event_loop()
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200130 return current_task(loop)
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800131
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700132 @classmethod
133 def all_tasks(cls, loop=None):
134 """Return a set of all tasks for an event loop.
135
136 By default all tasks for the current event loop are returned.
137 """
Matthias Bussonnierd0ebf132019-05-20 23:20:10 -0700138 warnings.warn("Task.all_tasks() is deprecated since Python 3.7, "
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200139 "use asyncio.all_tasks() instead",
Inada Naokic5c6cda2019-03-22 20:07:32 +0900140 DeprecationWarning,
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200141 stacklevel=2)
Yury Selivanov416c1eb2018-05-28 17:54:02 -0400142 return _all_tasks_compat(loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700143
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300144 def __init__(self, coro, *, loop=None, name=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700145 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200146 if self._source_traceback:
147 del self._source_traceback[-1]
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200148 if not coroutines.iscoroutine(coro):
149 # raise after Future.__init__(), attrs are required for __del__
150 # prevent logging for pending task in __del__
151 self._log_destroy_pending = False
152 raise TypeError(f"a coroutine was expected, got {coro!r}")
153
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300154 if name is None:
155 self._name = f'Task-{_task_name_counter()}'
156 else:
157 self._name = str(name)
158
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700159 self._must_cancel = False
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200160 self._fut_waiter = None
161 self._coro = coro
Yury Selivanovf23746a2018-01-22 19:11:18 -0500162 self._context = contextvars.copy_context()
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200163
Yury Selivanov22feeb82018-01-24 11:31:01 -0500164 self._loop.call_soon(self.__step, context=self._context)
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500165 _register_task(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700166
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900167 def __del__(self):
168 if self._state == futures._PENDING and self._log_destroy_pending:
169 context = {
170 'task': self,
171 'message': 'Task was destroyed but it is pending!',
172 }
173 if self._source_traceback:
174 context['source_traceback'] = self._source_traceback
175 self._loop.call_exception_handler(context)
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500176 super().__del__()
Victor Stinnera02f81f2014-06-24 22:37:53 +0200177
Batuhan Taşkayadec36722019-12-07 14:05:07 +0300178 def __class_getitem__(cls, type):
179 return cls
180
Victor Stinner313a9802014-07-29 12:58:23 +0200181 def _repr_info(self):
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400182 return base_tasks._task_repr_info(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700183
Alex Grönholm98ef9202019-05-30 18:30:09 +0300184 def get_coro(self):
185 return self._coro
186
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300187 def get_name(self):
188 return self._name
189
190 def set_name(self, value):
191 self._name = str(value)
192
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500193 def set_result(self, result):
194 raise RuntimeError('Task does not support set_result operation')
195
196 def set_exception(self, exception):
197 raise RuntimeError('Task does not support set_exception operation')
198
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700199 def get_stack(self, *, limit=None):
200 """Return the list of stack frames for this task's coroutine.
201
Victor Stinnerd87de832014-12-02 17:57:04 +0100202 If the coroutine is not done, this returns the stack where it is
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700203 suspended. If the coroutine has completed successfully or was
204 cancelled, this returns an empty list. If the coroutine was
205 terminated by an exception, this returns the list of traceback
206 frames.
207
208 The frames are always ordered from oldest to newest.
209
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500210 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700211 return; by default all available frames are returned. Its
212 meaning differs depending on whether a stack or a traceback is
213 returned: the newest frames of a stack are returned, but the
214 oldest frames of a traceback are returned. (This matches the
215 behavior of the traceback module.)
216
217 For reasons beyond our control, only one stack frame is
218 returned for a suspended coroutine.
219 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400220 return base_tasks._task_get_stack(self, limit)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700221
222 def print_stack(self, *, limit=None, file=None):
223 """Print the stack or traceback for this task's coroutine.
224
225 This produces output similar to that of the traceback module,
226 for the frames retrieved by get_stack(). The limit argument
227 is passed to get_stack(). The file argument is an I/O stream
R David Murray8e069d52014-09-24 13:13:45 -0400228 to which the output is written; by default output is written
229 to sys.stderr.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700230 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400231 return base_tasks._task_print_stack(self, limit, file)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700232
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700233 def cancel(self, msg=None):
R David Murray8e069d52014-09-24 13:13:45 -0400234 """Request that this task cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200235
Victor Stinner8d213572014-06-02 23:06:46 +0200236 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200237 wrapped coroutine on the next cycle through the event loop.
238 The coroutine then has a chance to clean up or even deny
239 the request using try/except/finally.
240
R David Murray8e069d52014-09-24 13:13:45 -0400241 Unlike Future.cancel, this does not guarantee that the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200242 task will be cancelled: the exception might be caught and
R David Murray8e069d52014-09-24 13:13:45 -0400243 acted upon, delaying cancellation of the task or preventing
244 cancellation completely. The task may also return a value or
245 raise a different exception.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200246
247 Immediately after this method is called, Task.cancelled() will
248 not return True (unless the task was already cancelled). A
249 task will be marked as cancelled when the wrapped coroutine
250 terminates with a CancelledError exception (even if cancel()
251 was not called).
252 """
Yury Selivanov7ce1c6f2017-06-11 13:49:18 +0000253 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700254 if self.done():
255 return False
256 if self._fut_waiter is not None:
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700257 if self._fut_waiter.cancel(msg=msg):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700258 # Leave self._fut_waiter; it may be a Task that
259 # catches and ignores the cancellation so we may have
260 # to cancel it again later.
261 return True
Yury Selivanov22feeb82018-01-24 11:31:01 -0500262 # It must be the case that self.__step is already scheduled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700263 self._must_cancel = True
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700264 self._cancel_message = msg
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700265 return True
266
Yury Selivanov22feeb82018-01-24 11:31:01 -0500267 def __step(self, exc=None):
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500268 if self.done():
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700269 raise exceptions.InvalidStateError(
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500270 f'_step(): already done: {self!r}, {exc!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700271 if self._must_cancel:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700272 if not isinstance(exc, exceptions.CancelledError):
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700273 exc = exceptions.CancelledError(''
274 if self._cancel_message is None else self._cancel_message)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700275 self._must_cancel = False
276 coro = self._coro
277 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800278
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200279 _enter_task(self._loop, self)
Yury Selivanovd59bba82015-11-20 12:41:03 -0500280 # Call either coro.throw(exc) or coro.send(None).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700281 try:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500282 if exc is None:
283 # We use the `send` method directly, because coroutines
284 # don't have `__iter__` and `__next__` methods.
285 result = coro.send(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700286 else:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500287 result = coro.throw(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700288 except StopIteration as exc:
INADA Naoki991adca2017-05-11 21:18:38 +0900289 if self._must_cancel:
290 # Task is cancelled right before coro stops.
291 self._must_cancel = False
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700292 super().cancel(msg=self._cancel_message)
INADA Naoki991adca2017-05-11 21:18:38 +0900293 else:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500294 super().set_result(exc.value)
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700295 except exceptions.CancelledError as exc:
296 if exc.args:
297 cancel_msg = exc.args[0]
298 else:
299 cancel_msg = None
300 super().cancel(msg=cancel_msg) # I.e., Future.cancel(self).
Yury Selivanov431b5402019-05-27 14:45:12 +0200301 except (KeyboardInterrupt, SystemExit) as exc:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500302 super().set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700303 raise
Yury Selivanov431b5402019-05-27 14:45:12 +0200304 except BaseException as exc:
305 super().set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700306 else:
Guido van Rossum1140a032016-09-09 12:54:54 -0700307 blocking = getattr(result, '_asyncio_future_blocking', None)
308 if blocking is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700309 # Yielded Future must come from Future.__iter__().
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500310 if futures._get_loop(result) is not self._loop:
Yury Selivanov6370f342017-12-10 18:36:12 -0500311 new_exc = RuntimeError(
312 f'Task {self!r} got Future '
313 f'{result!r} attached to a different loop')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500314 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500315 self.__step, new_exc, context=self._context)
Guido van Rossum1140a032016-09-09 12:54:54 -0700316 elif blocking:
Yury Selivanov4145c832016-10-09 12:19:12 -0400317 if result is self:
Yury Selivanov6370f342017-12-10 18:36:12 -0500318 new_exc = RuntimeError(
319 f'Task cannot await on itself: {self!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500320 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500321 self.__step, new_exc, context=self._context)
Yury Selivanov4145c832016-10-09 12:19:12 -0400322 else:
323 result._asyncio_future_blocking = False
Yury Selivanovf23746a2018-01-22 19:11:18 -0500324 result.add_done_callback(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500325 self.__wakeup, context=self._context)
Yury Selivanov4145c832016-10-09 12:19:12 -0400326 self._fut_waiter = result
327 if self._must_cancel:
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700328 if self._fut_waiter.cancel(
329 msg=self._cancel_message):
Yury Selivanov4145c832016-10-09 12:19:12 -0400330 self._must_cancel = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700331 else:
Yury Selivanov6370f342017-12-10 18:36:12 -0500332 new_exc = RuntimeError(
333 f'yield was used instead of yield from '
334 f'in task {self!r} with {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500335 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500336 self.__step, new_exc, context=self._context)
Yury Selivanov6370f342017-12-10 18:36:12 -0500337
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700338 elif result is None:
339 # Bare yield relinquishes control for one event loop iteration.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500340 self._loop.call_soon(self.__step, context=self._context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700341 elif inspect.isgenerator(result):
342 # Yielding a generator is just wrong.
Yury Selivanov6370f342017-12-10 18:36:12 -0500343 new_exc = RuntimeError(
344 f'yield was used instead of yield from for '
Serhiy Storchaka66553542018-05-20 16:30:31 +0300345 f'generator in task {self!r} with {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500346 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500347 self.__step, new_exc, context=self._context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700348 else:
349 # Yielding something else is an error.
Yury Selivanov6370f342017-12-10 18:36:12 -0500350 new_exc = RuntimeError(f'Task got bad yield: {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500351 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500352 self.__step, new_exc, context=self._context)
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800353 finally:
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200354 _leave_task(self._loop, self)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100355 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700356
Yury Selivanov22feeb82018-01-24 11:31:01 -0500357 def __wakeup(self, future):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700358 try:
Yury Selivanova4afc482015-11-16 15:12:10 -0500359 future.result()
Yury Selivanov431b5402019-05-27 14:45:12 +0200360 except BaseException as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700361 # This may also be a cancellation.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500362 self.__step(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700363 else:
Yury Selivanova4afc482015-11-16 15:12:10 -0500364 # Don't pass the value of `future.result()` explicitly,
365 # as `Future.__iter__` and `Future.__await__` don't need it.
366 # If we call `_step(value, None)` instead of `_step()`,
367 # Python eval loop would use `.send(value)` method call,
368 # instead of `__next__()`, which is slower for futures
369 # that return non-generator iterators from their `__iter__`.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500370 self.__step()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700371 self = None # Needed to break cycles when an exception occurs.
372
373
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400374_PyTask = Task
375
376
377try:
378 import _asyncio
379except ImportError:
380 pass
381else:
382 # _CTask is needed for tests.
383 Task = _CTask = _asyncio.Task
384
385
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300386def create_task(coro, *, name=None):
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200387 """Schedule the execution of a coroutine object in a spawn task.
388
389 Return a Task object.
390 """
391 loop = events.get_running_loop()
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300392 task = loop.create_task(coro)
393 _set_task_name(task, name)
394 return task
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200395
396
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700397# wait() and as_completed() similar to those in PEP 3148.
398
399FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
400FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
401ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
402
403
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200404async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700405 """Wait for the Futures and coroutines given by fs to complete.
406
Victor Stinnerdb74d982014-06-10 11:16:05 +0200407 The sequence futures must not be empty.
408
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700409 Coroutines will be wrapped in Tasks.
410
411 Returns two sets of Future: (done, pending).
412
413 Usage:
414
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200415 done, pending = await asyncio.wait(fs)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700416
417 Note: This does not raise TimeoutError! Futures that aren't done
418 when the timeout occurs are returned in the second set.
419 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700420 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500421 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700422 if not fs:
423 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200424 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
Yury Selivanov6370f342017-12-10 18:36:12 -0500425 raise ValueError(f'Invalid return_when value: {return_when}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700426
427 if loop is None:
João Júnior558c49b2018-09-24 06:51:22 -0300428 loop = events.get_running_loop()
429 else:
Matthias Bussonnierd0ebf132019-05-20 23:20:10 -0700430 warnings.warn("The loop argument is deprecated since Python 3.8, "
431 "and scheduled for removal in Python 3.10.",
João Júnior558c49b2018-09-24 06:51:22 -0300432 DeprecationWarning, stacklevel=2)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700433
Kyle Stanley89aa7f02019-12-30 06:50:19 -0500434 if any(coroutines.iscoroutine(f) for f in set(fs)):
435 warnings.warn("The explicit passing of coroutine objects to "
436 "asyncio.wait() is deprecated since Python 3.8, and "
437 "scheduled for removal in Python 3.11.",
438 DeprecationWarning, stacklevel=2)
439
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400440 fs = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700441
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200442 return await _wait(fs, timeout, return_when, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700443
444
Victor Stinner59e08022014-08-28 11:19:25 +0200445def _release_waiter(waiter, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700446 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200447 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700448
449
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200450async def wait_for(fut, timeout, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700451 """Wait for the single Future or coroutine to complete, with timeout.
452
453 Coroutine will be wrapped in Task.
454
Victor Stinner421e49b2014-01-23 17:40:59 +0100455 Returns result of the Future or coroutine. When a timeout occurs,
456 it cancels the task and raises TimeoutError. To avoid the task
457 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700458
Victor Stinner922bc2c2015-01-15 16:29:10 +0100459 If the wait is cancelled, the task is also cancelled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700460
Victor Stinner922bc2c2015-01-15 16:29:10 +0100461 This function is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700462 """
463 if loop is None:
João Júnior558c49b2018-09-24 06:51:22 -0300464 loop = events.get_running_loop()
465 else:
Matthias Bussonnierd0ebf132019-05-20 23:20:10 -0700466 warnings.warn("The loop argument is deprecated since Python 3.8, "
467 "and scheduled for removal in Python 3.10.",
João Júnior558c49b2018-09-24 06:51:22 -0300468 DeprecationWarning, stacklevel=2)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700469
Guido van Rossum48c66c32014-01-29 14:30:38 -0800470 if timeout is None:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200471 return await fut
Guido van Rossum48c66c32014-01-29 14:30:38 -0800472
Victor K4d071892017-10-05 19:04:39 +0300473 if timeout <= 0:
474 fut = ensure_future(fut, loop=loop)
475
476 if fut.done():
477 return fut.result()
478
479 fut.cancel()
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700480 raise exceptions.TimeoutError()
Victor K4d071892017-10-05 19:04:39 +0300481
Yury Selivanov7661db62016-05-16 15:38:39 -0400482 waiter = loop.create_future()
Victor Stinner59e08022014-08-28 11:19:25 +0200483 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
484 cb = functools.partial(_release_waiter, waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700485
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400486 fut = ensure_future(fut, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700487 fut.add_done_callback(cb)
488
489 try:
Victor Stinner59e08022014-08-28 11:19:25 +0200490 # wait until the future completes or the timeout
Victor Stinner922bc2c2015-01-15 16:29:10 +0100491 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200492 await waiter
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700493 except exceptions.CancelledError:
Victor Stinner922bc2c2015-01-15 16:29:10 +0100494 fut.remove_done_callback(cb)
495 fut.cancel()
496 raise
Victor Stinner59e08022014-08-28 11:19:25 +0200497
498 if fut.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700499 return fut.result()
500 else:
501 fut.remove_done_callback(cb)
Elvis Pranskevichuse2b340a2018-05-29 17:31:01 -0400502 # We must ensure that the task is not running
503 # after wait_for() returns.
504 # See https://bugs.python.org/issue32751
505 await _cancel_and_wait(fut, loop=loop)
romasku382a5632020-05-15 23:12:05 +0300506 # In case task cancellation failed with some
507 # exception, we should re-raise it
508 # See https://bugs.python.org/issue40607
509 try:
510 fut.result()
511 except exceptions.CancelledError as exc:
512 raise exceptions.TimeoutError() from exc
513 else:
514 raise exceptions.TimeoutError()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700515 finally:
516 timeout_handle.cancel()
517
518
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200519async def _wait(fs, timeout, return_when, loop):
Elvis Pranskevichuse2b340a2018-05-29 17:31:01 -0400520 """Internal helper for wait().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700521
522 The fs argument must be a collection of Futures.
523 """
524 assert fs, 'Set of Futures is empty.'
Yury Selivanov7661db62016-05-16 15:38:39 -0400525 waiter = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700526 timeout_handle = None
527 if timeout is not None:
528 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
529 counter = len(fs)
530
531 def _on_completion(f):
532 nonlocal counter
533 counter -= 1
534 if (counter <= 0 or
535 return_when == FIRST_COMPLETED or
536 return_when == FIRST_EXCEPTION and (not f.cancelled() and
537 f.exception() is not None)):
538 if timeout_handle is not None:
539 timeout_handle.cancel()
540 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200541 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700542
543 for f in fs:
544 f.add_done_callback(_on_completion)
545
546 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200547 await waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700548 finally:
549 if timeout_handle is not None:
550 timeout_handle.cancel()
gescheitc1964e92019-05-03 18:18:02 +0300551 for f in fs:
552 f.remove_done_callback(_on_completion)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700553
554 done, pending = set(), set()
555 for f in fs:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700556 if f.done():
557 done.add(f)
558 else:
559 pending.add(f)
560 return done, pending
561
562
Elvis Pranskevichuse2b340a2018-05-29 17:31:01 -0400563async def _cancel_and_wait(fut, loop):
564 """Cancel the *fut* future or task and wait until it completes."""
565
566 waiter = loop.create_future()
567 cb = functools.partial(_release_waiter, waiter)
568 fut.add_done_callback(cb)
569
570 try:
571 fut.cancel()
572 # We cannot wait on *fut* directly to make
573 # sure _cancel_and_wait itself is reliably cancellable.
574 await waiter
575 finally:
576 fut.remove_done_callback(cb)
577
578
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700579# This is *not* a @coroutine! It is just an iterator (yielding Futures).
580def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800581 """Return an iterator whose values are coroutines.
582
583 When waiting for the yielded coroutines you'll get the results (or
584 exceptions!) of the original Futures (or coroutines), in the order
585 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700586
587 This differs from PEP 3148; the proper way to use this is:
588
589 for f in as_completed(fs):
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200590 result = await f # The 'await' may raise.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700591 # Use result.
592
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200593 If a timeout is specified, the 'await' will raise
Guido van Rossumb58f0532014-02-12 17:58:19 -0800594 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700595
596 Note: The futures 'f' are not necessarily members of fs.
597 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700598 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500599 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Andrew Svetlova4888792019-09-12 15:40:40 +0300600
Guido van Rossumb58f0532014-02-12 17:58:19 -0800601 from .queues import Queue # Import here to avoid circular import problem.
602 done = Queue(loop=loop)
Andrew Svetlova4888792019-09-12 15:40:40 +0300603
604 if loop is None:
605 loop = events.get_event_loop()
606 else:
607 warnings.warn("The loop argument is deprecated since Python 3.8, "
608 "and scheduled for removal in Python 3.10.",
609 DeprecationWarning, stacklevel=2)
610 todo = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800611 timeout_handle = None
612
613 def _on_timeout():
614 for f in todo:
615 f.remove_done_callback(_on_completion)
616 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
617 todo.clear() # Can't do todo.remove(f) in the loop.
618
619 def _on_completion(f):
620 if not todo:
621 return # _on_timeout() was here first.
622 todo.remove(f)
623 done.put_nowait(f)
624 if not todo and timeout_handle is not None:
625 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700626
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200627 async def _wait_for_one():
628 f = await done.get()
Guido van Rossumb58f0532014-02-12 17:58:19 -0800629 if f is None:
630 # Dummy value from _on_timeout().
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700631 raise exceptions.TimeoutError
Guido van Rossumb58f0532014-02-12 17:58:19 -0800632 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700633
Guido van Rossumb58f0532014-02-12 17:58:19 -0800634 for f in todo:
635 f.add_done_callback(_on_completion)
636 if todo and timeout is not None:
637 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700638 for _ in range(len(todo)):
639 yield _wait_for_one()
640
641
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200642@types.coroutine
643def __sleep0():
644 """Skip one event loop run cycle.
645
646 This is a private helper for 'asyncio.sleep()', used
647 when the 'delay' is set to 0. It uses a bare 'yield'
Yury Selivanov22feeb82018-01-24 11:31:01 -0500648 expression (which Task.__step knows how to handle)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200649 instead of creating a Future object.
650 """
651 yield
652
653
654async def sleep(delay, result=None, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700655 """Coroutine that completes after a given time (in seconds)."""
Andrew Svetlov5382c052017-12-17 16:41:30 +0200656 if delay <= 0:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200657 await __sleep0()
Yury Selivanovade04122015-11-05 14:29:04 -0500658 return result
659
Yury Selivanov7661db62016-05-16 15:38:39 -0400660 if loop is None:
João Júnior558c49b2018-09-24 06:51:22 -0300661 loop = events.get_running_loop()
662 else:
Matthias Bussonnierd0ebf132019-05-20 23:20:10 -0700663 warnings.warn("The loop argument is deprecated since Python 3.8, "
664 "and scheduled for removal in Python 3.10.",
João Júnior558c49b2018-09-24 06:51:22 -0300665 DeprecationWarning, stacklevel=2)
666
Yury Selivanov7661db62016-05-16 15:38:39 -0400667 future = loop.create_future()
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500668 h = loop.call_later(delay,
669 futures._set_result_unless_cancelled,
670 future, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700671 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200672 return await future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700673 finally:
674 h.cancel()
675
676
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400677def ensure_future(coro_or_future, *, loop=None):
Yury Selivanov620279b2015-10-02 15:00:19 -0400678 """Wrap a coroutine or an awaitable in a future.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400679
680 If the argument is a Future, it is returned directly.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700681 """
jimmylaie549c4b2018-05-28 06:42:05 -1000682 if coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200683 if loop is None:
684 loop = events.get_event_loop()
685 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200686 if task._source_traceback:
687 del task._source_traceback[-1]
688 return task
jimmylaie549c4b2018-05-28 06:42:05 -1000689 elif futures.isfuture(coro_or_future):
690 if loop is not None and loop is not futures._get_loop(coro_or_future):
Zackery Spytz4737b922019-05-03 09:35:26 -0600691 raise ValueError('The future belongs to a different loop than '
692 'the one specified as the loop argument')
jimmylaie549c4b2018-05-28 06:42:05 -1000693 return coro_or_future
Victor Stinner3f438a92017-11-28 14:43:52 +0100694 elif inspect.isawaitable(coro_or_future):
Yury Selivanov620279b2015-10-02 15:00:19 -0400695 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700696 else:
Charles Renwickae5b3262017-04-21 16:49:48 -0400697 raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
698 'required')
Yury Selivanov620279b2015-10-02 15:00:19 -0400699
700
Andrew Svetlov68b34a72019-05-16 17:52:10 +0300701@types.coroutine
Yury Selivanov620279b2015-10-02 15:00:19 -0400702def _wrap_awaitable(awaitable):
703 """Helper for asyncio.ensure_future().
704
705 Wraps awaitable (an object with __await__) into a coroutine
706 that will later be wrapped in a Task by ensure_future().
707 """
708 return (yield from awaitable.__await__())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700709
Andrew Svetlov68b34a72019-05-16 17:52:10 +0300710_wrap_awaitable._is_coroutine = _is_coroutine
711
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700712
713class _GatheringFuture(futures.Future):
714 """Helper for gather().
715
716 This overrides cancel() to cancel all the children and act more
717 like Task.cancel(), which doesn't immediately mark itself as
718 cancelled.
719 """
720
721 def __init__(self, children, *, loop=None):
722 super().__init__(loop=loop)
723 self._children = children
Yury Selivanov863b6742018-05-29 17:20:02 -0400724 self._cancel_requested = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700725
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700726 def cancel(self, msg=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700727 if self.done():
728 return False
Yury Selivanov3d676152016-10-21 17:22:17 -0400729 ret = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700730 for child in self._children:
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700731 if child.cancel(msg=msg):
Yury Selivanov3d676152016-10-21 17:22:17 -0400732 ret = True
Yury Selivanov863b6742018-05-29 17:20:02 -0400733 if ret:
734 # If any child tasks were actually cancelled, we should
735 # propagate the cancellation request regardless of
736 # *return_exceptions* argument. See issue 32684.
737 self._cancel_requested = True
Yury Selivanov3d676152016-10-21 17:22:17 -0400738 return ret
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700739
740
741def gather(*coros_or_futures, loop=None, return_exceptions=False):
Yury Selivanov36c2c042017-12-19 07:19:53 -0500742 """Return a future aggregating results from the given coroutines/futures.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700743
Guido van Rossume3c65a72016-09-30 08:17:15 -0700744 Coroutines will be wrapped in a future and scheduled in the event
745 loop. They will not necessarily be scheduled in the same order as
746 passed in.
747
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700748 All futures must share the same event loop. If all the tasks are
749 done successfully, the returned future's result is the list of
750 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500751 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700752 exceptions in the tasks are treated the same as successful
753 results, and gathered in the result list; otherwise, the first
754 raised exception will be immediately propagated to the returned
755 future.
756
757 Cancellation: if the outer Future is cancelled, all children (that
758 have not completed yet) are also cancelled. If any child is
759 cancelled, this is treated as if it raised CancelledError --
760 the outer Future is *not* cancelled in this case. (This is to
761 prevent the cancellation of one child to cause other children to
762 be cancelled.)
763 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200764 if not coros_or_futures:
Yury Selivanov7661db62016-05-16 15:38:39 -0400765 if loop is None:
766 loop = events.get_event_loop()
Andrew Svetlova4888792019-09-12 15:40:40 +0300767 else:
768 warnings.warn("The loop argument is deprecated since Python 3.8, "
769 "and scheduled for removal in Python 3.10.",
770 DeprecationWarning, stacklevel=2)
Yury Selivanov7661db62016-05-16 15:38:39 -0400771 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700772 outer.set_result([])
773 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200774
Yury Selivanov36c2c042017-12-19 07:19:53 -0500775 def _done_callback(fut):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700776 nonlocal nfinished
Yury Selivanov36c2c042017-12-19 07:19:53 -0500777 nfinished += 1
778
Victor Stinner3531d902015-01-09 01:42:52 +0100779 if outer.done():
780 if not fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700781 # Mark exception retrieved.
782 fut.exception()
783 return
Victor Stinner3531d902015-01-09 01:42:52 +0100784
Yury Selivanov36c2c042017-12-19 07:19:53 -0500785 if not return_exceptions:
786 if fut.cancelled():
787 # Check if 'fut' is cancelled first, as
788 # 'fut.exception()' will *raise* a CancelledError
789 # instead of returning it.
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700790 exc = exceptions.CancelledError(''
791 if fut._cancel_message is None else fut._cancel_message)
Yury Selivanov36c2c042017-12-19 07:19:53 -0500792 outer.set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700793 return
Yury Selivanov36c2c042017-12-19 07:19:53 -0500794 else:
795 exc = fut.exception()
796 if exc is not None:
797 outer.set_exception(exc)
798 return
799
800 if nfinished == nfuts:
801 # All futures are done; create a list of results
802 # and set it to the 'outer' future.
803 results = []
804
805 for fut in children:
806 if fut.cancelled():
807 # Check if 'fut' is cancelled first, as
808 # 'fut.exception()' will *raise* a CancelledError
809 # instead of returning it.
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700810 res = exceptions.CancelledError(
811 '' if fut._cancel_message is None else
812 fut._cancel_message)
Yury Selivanov36c2c042017-12-19 07:19:53 -0500813 else:
814 res = fut.exception()
815 if res is None:
816 res = fut.result()
817 results.append(res)
818
Yury Selivanov863b6742018-05-29 17:20:02 -0400819 if outer._cancel_requested:
820 # If gather is being cancelled we must propagate the
821 # cancellation regardless of *return_exceptions* argument.
822 # See issue 32684.
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700823 exc = exceptions.CancelledError(''
824 if fut._cancel_message is None else fut._cancel_message)
825 outer.set_exception(exc)
Yury Selivanov863b6742018-05-29 17:20:02 -0400826 else:
827 outer.set_result(results)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700828
Yury Selivanov36c2c042017-12-19 07:19:53 -0500829 arg_to_fut = {}
830 children = []
831 nfuts = 0
832 nfinished = 0
833 for arg in coros_or_futures:
834 if arg not in arg_to_fut:
835 fut = ensure_future(arg, loop=loop)
836 if loop is None:
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500837 loop = futures._get_loop(fut)
Yury Selivanov36c2c042017-12-19 07:19:53 -0500838 if fut is not arg:
839 # 'arg' was not a Future, therefore, 'fut' is a new
840 # Future created specifically for 'arg'. Since the caller
841 # can't control it, disable the "destroy pending task"
842 # warning.
843 fut._log_destroy_pending = False
844
845 nfuts += 1
846 arg_to_fut[arg] = fut
847 fut.add_done_callback(_done_callback)
848
849 else:
850 # There's a duplicate Future object in coros_or_futures.
851 fut = arg_to_fut[arg]
852
853 children.append(fut)
854
855 outer = _GatheringFuture(children, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700856 return outer
857
858
859def shield(arg, *, loop=None):
860 """Wait for a future, shielding it from cancellation.
861
862 The statement
863
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200864 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700865
866 is exactly equivalent to the statement
867
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200868 res = await something()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700869
870 *except* that if the coroutine containing it is cancelled, the
871 task running in something() is not cancelled. From the POV of
872 something(), the cancellation did not happen. But its caller is
873 still cancelled, so the yield-from expression still raises
874 CancelledError. Note: If something() is cancelled by other means
875 this will still cancel shield().
876
877 If you want to completely ignore cancellation (not recommended)
878 you can combine shield() with a try/except clause, as follows:
879
880 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200881 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700882 except CancelledError:
883 res = None
884 """
Andrew Svetlova4888792019-09-12 15:40:40 +0300885 if loop is not None:
886 warnings.warn("The loop argument is deprecated since Python 3.8, "
887 "and scheduled for removal in Python 3.10.",
888 DeprecationWarning, stacklevel=2)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400889 inner = ensure_future(arg, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700890 if inner.done():
891 # Shortcut.
892 return inner
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500893 loop = futures._get_loop(inner)
Yury Selivanov7661db62016-05-16 15:38:39 -0400894 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700895
Romain Picardb35acc52019-05-07 20:58:24 +0200896 def _inner_done_callback(inner):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700897 if outer.cancelled():
Victor Stinner3531d902015-01-09 01:42:52 +0100898 if not inner.cancelled():
899 # Mark inner's result as retrieved.
900 inner.exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700901 return
Victor Stinner3531d902015-01-09 01:42:52 +0100902
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700903 if inner.cancelled():
904 outer.cancel()
905 else:
906 exc = inner.exception()
907 if exc is not None:
908 outer.set_exception(exc)
909 else:
910 outer.set_result(inner.result())
911
Romain Picardb35acc52019-05-07 20:58:24 +0200912
913 def _outer_done_callback(outer):
914 if not inner.done():
915 inner.remove_done_callback(_inner_done_callback)
916
917 inner.add_done_callback(_inner_done_callback)
918 outer.add_done_callback(_outer_done_callback)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700919 return outer
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700920
921
922def run_coroutine_threadsafe(coro, loop):
923 """Submit a coroutine object to a given event loop.
924
925 Return a concurrent.futures.Future to access the result.
926 """
927 if not coroutines.iscoroutine(coro):
928 raise TypeError('A coroutine object is required')
929 future = concurrent.futures.Future()
930
931 def callback():
Guido van Rossum601953b2015-10-05 16:20:00 -0700932 try:
933 futures._chain_future(ensure_future(coro, loop=loop), future)
Yury Selivanov431b5402019-05-27 14:45:12 +0200934 except (SystemExit, KeyboardInterrupt):
935 raise
936 except BaseException as exc:
Guido van Rossum601953b2015-10-05 16:20:00 -0700937 if future.set_running_or_notify_cancel():
938 future.set_exception(exc)
939 raise
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700940
941 loop.call_soon_threadsafe(callback)
942 return future
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200943
944
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500945# WeakSet containing all alive tasks.
946_all_tasks = weakref.WeakSet()
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200947
948# Dictionary containing tasks that are currently active in
949# all running event loops. {EventLoop: Task}
950_current_tasks = {}
951
952
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500953def _register_task(task):
954 """Register a new task in asyncio as executed by loop."""
955 _all_tasks.add(task)
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200956
957
958def _enter_task(loop, task):
959 current_task = _current_tasks.get(loop)
960 if current_task is not None:
961 raise RuntimeError(f"Cannot enter into task {task!r} while another "
962 f"task {current_task!r} is being executed.")
963 _current_tasks[loop] = task
964
965
966def _leave_task(loop, task):
967 current_task = _current_tasks.get(loop)
968 if current_task is not task:
969 raise RuntimeError(f"Leaving task {task!r} does not match "
970 f"the current task {current_task!r}.")
971 del _current_tasks[loop]
972
973
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500974def _unregister_task(task):
975 """Unregister a task."""
976 _all_tasks.discard(task)
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200977
978
979_py_register_task = _register_task
980_py_unregister_task = _unregister_task
981_py_enter_task = _enter_task
982_py_leave_task = _leave_task
983
984
985try:
986 from _asyncio import (_register_task, _unregister_task,
987 _enter_task, _leave_task,
988 _all_tasks, _current_tasks)
989except ImportError:
990 pass
991else:
992 _c_register_task = _register_task
993 _c_unregister_task = _unregister_task
994 _c_enter_task = _enter_task
995 _c_leave_task = _leave_task