blob: 21b98b6647bd9009fa1f98d6132d3460fa4ada49 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Yury Selivanov6370f342017-12-10 18:36:12 -05003__all__ = (
Andrew Svetlovf74ef452017-12-15 07:04:38 +02004 'Task', 'create_task',
Yury Selivanov6370f342017-12-10 18:36:12 -05005 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
Yury Selivanov9edad3c2017-12-11 10:03:48 -05006 'wait', 'wait_for', 'as_completed', 'sleep',
Yury Selivanov6370f342017-12-10 18:36:12 -05007 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
Andrew Svetlov44d1a592017-12-16 21:58:38 +02008 'current_task', 'all_tasks',
9 '_register_task', '_unregister_task', '_enter_task', '_leave_task',
Yury Selivanov6370f342017-12-10 18:36:12 -050010)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012import concurrent.futures
Yury Selivanovf23746a2018-01-22 19:11:18 -050013import contextvars
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014import functools
15import inspect
Alex Grönholmcca4eec2018-08-09 00:06:47 +030016import itertools
Andrew Svetlov5f841b52017-12-09 00:23:48 +020017import types
Yury Selivanov59eb9a42015-05-11 14:48:38 -040018import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019import weakref
20
Yury Selivanova0c1ba62016-10-28 12:52:37 -040021from . import base_tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020022from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070024from . import exceptions
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025from . import futures
Andrew Svetlov68b34a72019-05-16 17:52:10 +030026from .coroutines import _is_coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027
Alex Grönholmcca4eec2018-08-09 00:06:47 +030028# Helper to generate new task names
29# This uses itertools.count() instead of a "+= 1" operation because the latter
30# is not thread safe. See bpo-11866 for a longer explanation.
31_task_name_counter = itertools.count(1).__next__
32
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070033
Andrew Svetlov44d1a592017-12-16 21:58:38 +020034def current_task(loop=None):
35 """Return a currently executed task."""
36 if loop is None:
37 loop = events.get_running_loop()
38 return _current_tasks.get(loop)
39
40
41def all_tasks(loop=None):
42 """Return a set of all tasks for the loop."""
43 if loop is None:
Yury Selivanov416c1eb2018-05-28 17:54:02 -040044 loop = events.get_running_loop()
Andrew Svetlov65aa64f2019-06-11 18:27:30 +030045 # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
46 # thread while we do so. Therefore we cast it to list prior to filtering. The list
47 # cast itself requires iteration, so we repeat it several times ignoring
48 # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
49 # details.
50 i = 0
51 while True:
52 try:
53 tasks = list(_all_tasks)
54 except RuntimeError:
55 i += 1
56 if i >= 1000:
57 raise
58 else:
59 break
60 return {t for t in tasks
Yury Selivanov416c1eb2018-05-28 17:54:02 -040061 if futures._get_loop(t) is loop and not t.done()}
62
63
64def _all_tasks_compat(loop=None):
65 # Different from "all_task()" by returning *all* Tasks, including
66 # the completed ones. Used to implement deprecated "Tasks.all_task()"
67 # method.
68 if loop is None:
Andrew Svetlov44d1a592017-12-16 21:58:38 +020069 loop = events.get_event_loop()
Andrew Svetlov65aa64f2019-06-11 18:27:30 +030070 # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
71 # thread while we do so. Therefore we cast it to list prior to filtering. The list
72 # cast itself requires iteration, so we repeat it several times ignoring
73 # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
74 # details.
75 i = 0
76 while True:
77 try:
78 tasks = list(_all_tasks)
79 except RuntimeError:
80 i += 1
81 if i >= 1000:
82 raise
83 else:
84 break
85 return {t for t in tasks if futures._get_loop(t) is loop}
Andrew Svetlov44d1a592017-12-16 21:58:38 +020086
87
Alex Grönholmcca4eec2018-08-09 00:06:47 +030088def _set_task_name(task, name):
89 if name is not None:
90 try:
91 set_name = task.set_name
92 except AttributeError:
93 pass
94 else:
95 set_name(name)
96
97
Yury Selivanov0cf16f92017-12-25 10:48:15 -050098class Task(futures._PyFuture): # Inherit Python Task implementation
99 # from a Python Future implementation.
100
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700101 """A coroutine wrapped in a Future."""
102
103 # An important invariant maintained while a Task not done:
104 #
105 # - Either _fut_waiter is None, and _step() is scheduled;
106 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
107 #
108 # The only transition from the latter to the former is through
109 # _wakeup(). When _fut_waiter is not None, one of its callbacks
110 # must be _wakeup().
111
Victor Stinnerfe22e092014-12-04 23:00:13 +0100112 # If False, don't log a message if the task is destroyed whereas its
113 # status is still pending
114 _log_destroy_pending = True
115
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800116 @classmethod
117 def current_task(cls, loop=None):
118 """Return the currently running task in an event loop or None.
119
120 By default the current task for the current event loop is returned.
121
122 None is returned when called not in the context of a Task.
123 """
Matthias Bussonnierd0ebf132019-05-20 23:20:10 -0700124 warnings.warn("Task.current_task() is deprecated since Python 3.7, "
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200125 "use asyncio.current_task() instead",
Inada Naokic5c6cda2019-03-22 20:07:32 +0900126 DeprecationWarning,
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200127 stacklevel=2)
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800128 if loop is None:
129 loop = events.get_event_loop()
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200130 return current_task(loop)
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800131
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700132 @classmethod
133 def all_tasks(cls, loop=None):
134 """Return a set of all tasks for an event loop.
135
136 By default all tasks for the current event loop are returned.
137 """
Matthias Bussonnierd0ebf132019-05-20 23:20:10 -0700138 warnings.warn("Task.all_tasks() is deprecated since Python 3.7, "
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200139 "use asyncio.all_tasks() instead",
Inada Naokic5c6cda2019-03-22 20:07:32 +0900140 DeprecationWarning,
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200141 stacklevel=2)
Yury Selivanov416c1eb2018-05-28 17:54:02 -0400142 return _all_tasks_compat(loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700143
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300144 def __init__(self, coro, *, loop=None, name=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700145 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200146 if self._source_traceback:
147 del self._source_traceback[-1]
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200148 if not coroutines.iscoroutine(coro):
149 # raise after Future.__init__(), attrs are required for __del__
150 # prevent logging for pending task in __del__
151 self._log_destroy_pending = False
152 raise TypeError(f"a coroutine was expected, got {coro!r}")
153
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300154 if name is None:
155 self._name = f'Task-{_task_name_counter()}'
156 else:
157 self._name = str(name)
158
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700159 self._must_cancel = False
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200160 self._fut_waiter = None
161 self._coro = coro
Yury Selivanovf23746a2018-01-22 19:11:18 -0500162 self._context = contextvars.copy_context()
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200163
Yury Selivanov22feeb82018-01-24 11:31:01 -0500164 self._loop.call_soon(self.__step, context=self._context)
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500165 _register_task(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700166
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900167 def __del__(self):
168 if self._state == futures._PENDING and self._log_destroy_pending:
169 context = {
170 'task': self,
171 'message': 'Task was destroyed but it is pending!',
172 }
173 if self._source_traceback:
174 context['source_traceback'] = self._source_traceback
175 self._loop.call_exception_handler(context)
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500176 super().__del__()
Victor Stinnera02f81f2014-06-24 22:37:53 +0200177
Batuhan Taşkayadec36722019-12-07 14:05:07 +0300178 def __class_getitem__(cls, type):
179 return cls
180
Victor Stinner313a9802014-07-29 12:58:23 +0200181 def _repr_info(self):
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400182 return base_tasks._task_repr_info(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700183
Alex Grönholm98ef9202019-05-30 18:30:09 +0300184 def get_coro(self):
185 return self._coro
186
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300187 def get_name(self):
188 return self._name
189
190 def set_name(self, value):
191 self._name = str(value)
192
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500193 def set_result(self, result):
194 raise RuntimeError('Task does not support set_result operation')
195
196 def set_exception(self, exception):
197 raise RuntimeError('Task does not support set_exception operation')
198
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700199 def get_stack(self, *, limit=None):
200 """Return the list of stack frames for this task's coroutine.
201
Victor Stinnerd87de832014-12-02 17:57:04 +0100202 If the coroutine is not done, this returns the stack where it is
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700203 suspended. If the coroutine has completed successfully or was
204 cancelled, this returns an empty list. If the coroutine was
205 terminated by an exception, this returns the list of traceback
206 frames.
207
208 The frames are always ordered from oldest to newest.
209
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500210 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700211 return; by default all available frames are returned. Its
212 meaning differs depending on whether a stack or a traceback is
213 returned: the newest frames of a stack are returned, but the
214 oldest frames of a traceback are returned. (This matches the
215 behavior of the traceback module.)
216
217 For reasons beyond our control, only one stack frame is
218 returned for a suspended coroutine.
219 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400220 return base_tasks._task_get_stack(self, limit)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700221
222 def print_stack(self, *, limit=None, file=None):
223 """Print the stack or traceback for this task's coroutine.
224
225 This produces output similar to that of the traceback module,
226 for the frames retrieved by get_stack(). The limit argument
227 is passed to get_stack(). The file argument is an I/O stream
R David Murray8e069d52014-09-24 13:13:45 -0400228 to which the output is written; by default output is written
229 to sys.stderr.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700230 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400231 return base_tasks._task_print_stack(self, limit, file)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700232
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700233 def cancel(self, msg=None):
R David Murray8e069d52014-09-24 13:13:45 -0400234 """Request that this task cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200235
Victor Stinner8d213572014-06-02 23:06:46 +0200236 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200237 wrapped coroutine on the next cycle through the event loop.
238 The coroutine then has a chance to clean up or even deny
239 the request using try/except/finally.
240
R David Murray8e069d52014-09-24 13:13:45 -0400241 Unlike Future.cancel, this does not guarantee that the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200242 task will be cancelled: the exception might be caught and
R David Murray8e069d52014-09-24 13:13:45 -0400243 acted upon, delaying cancellation of the task or preventing
244 cancellation completely. The task may also return a value or
245 raise a different exception.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200246
247 Immediately after this method is called, Task.cancelled() will
248 not return True (unless the task was already cancelled). A
249 task will be marked as cancelled when the wrapped coroutine
250 terminates with a CancelledError exception (even if cancel()
251 was not called).
252 """
Yury Selivanov7ce1c6f2017-06-11 13:49:18 +0000253 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700254 if self.done():
255 return False
256 if self._fut_waiter is not None:
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700257 if self._fut_waiter.cancel(msg=msg):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700258 # Leave self._fut_waiter; it may be a Task that
259 # catches and ignores the cancellation so we may have
260 # to cancel it again later.
261 return True
Yury Selivanov22feeb82018-01-24 11:31:01 -0500262 # It must be the case that self.__step is already scheduled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700263 self._must_cancel = True
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700264 self._cancel_message = msg
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700265 return True
266
Yury Selivanov22feeb82018-01-24 11:31:01 -0500267 def __step(self, exc=None):
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500268 if self.done():
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700269 raise exceptions.InvalidStateError(
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500270 f'_step(): already done: {self!r}, {exc!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700271 if self._must_cancel:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700272 if not isinstance(exc, exceptions.CancelledError):
Chris Jerdonekda742ba2020-05-17 22:47:31 -0700273 exc = self._make_cancelled_error()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700274 self._must_cancel = False
275 coro = self._coro
276 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800277
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200278 _enter_task(self._loop, self)
Yury Selivanovd59bba82015-11-20 12:41:03 -0500279 # Call either coro.throw(exc) or coro.send(None).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700280 try:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500281 if exc is None:
282 # We use the `send` method directly, because coroutines
283 # don't have `__iter__` and `__next__` methods.
284 result = coro.send(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700285 else:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500286 result = coro.throw(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700287 except StopIteration as exc:
INADA Naoki991adca2017-05-11 21:18:38 +0900288 if self._must_cancel:
289 # Task is cancelled right before coro stops.
290 self._must_cancel = False
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700291 super().cancel(msg=self._cancel_message)
INADA Naoki991adca2017-05-11 21:18:38 +0900292 else:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500293 super().set_result(exc.value)
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700294 except exceptions.CancelledError as exc:
Chris Jerdonekda742ba2020-05-17 22:47:31 -0700295 # Save the original exception so we can chain it later.
296 self._cancelled_exc = exc
297 super().cancel() # I.e., Future.cancel(self).
Yury Selivanov431b5402019-05-27 14:45:12 +0200298 except (KeyboardInterrupt, SystemExit) as exc:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500299 super().set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700300 raise
Yury Selivanov431b5402019-05-27 14:45:12 +0200301 except BaseException as exc:
302 super().set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700303 else:
Guido van Rossum1140a032016-09-09 12:54:54 -0700304 blocking = getattr(result, '_asyncio_future_blocking', None)
305 if blocking is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700306 # Yielded Future must come from Future.__iter__().
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500307 if futures._get_loop(result) is not self._loop:
Yury Selivanov6370f342017-12-10 18:36:12 -0500308 new_exc = RuntimeError(
309 f'Task {self!r} got Future '
310 f'{result!r} attached to a different loop')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500311 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500312 self.__step, new_exc, context=self._context)
Guido van Rossum1140a032016-09-09 12:54:54 -0700313 elif blocking:
Yury Selivanov4145c832016-10-09 12:19:12 -0400314 if result is self:
Yury Selivanov6370f342017-12-10 18:36:12 -0500315 new_exc = RuntimeError(
316 f'Task cannot await on itself: {self!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500317 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500318 self.__step, new_exc, context=self._context)
Yury Selivanov4145c832016-10-09 12:19:12 -0400319 else:
320 result._asyncio_future_blocking = False
Yury Selivanovf23746a2018-01-22 19:11:18 -0500321 result.add_done_callback(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500322 self.__wakeup, context=self._context)
Yury Selivanov4145c832016-10-09 12:19:12 -0400323 self._fut_waiter = result
324 if self._must_cancel:
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700325 if self._fut_waiter.cancel(
326 msg=self._cancel_message):
Yury Selivanov4145c832016-10-09 12:19:12 -0400327 self._must_cancel = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700328 else:
Yury Selivanov6370f342017-12-10 18:36:12 -0500329 new_exc = RuntimeError(
330 f'yield was used instead of yield from '
331 f'in task {self!r} with {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500332 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500333 self.__step, new_exc, context=self._context)
Yury Selivanov6370f342017-12-10 18:36:12 -0500334
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700335 elif result is None:
336 # Bare yield relinquishes control for one event loop iteration.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500337 self._loop.call_soon(self.__step, context=self._context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700338 elif inspect.isgenerator(result):
339 # Yielding a generator is just wrong.
Yury Selivanov6370f342017-12-10 18:36:12 -0500340 new_exc = RuntimeError(
341 f'yield was used instead of yield from for '
Serhiy Storchaka66553542018-05-20 16:30:31 +0300342 f'generator in task {self!r} with {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500343 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500344 self.__step, new_exc, context=self._context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700345 else:
346 # Yielding something else is an error.
Yury Selivanov6370f342017-12-10 18:36:12 -0500347 new_exc = RuntimeError(f'Task got bad yield: {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500348 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500349 self.__step, new_exc, context=self._context)
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800350 finally:
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200351 _leave_task(self._loop, self)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100352 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700353
Yury Selivanov22feeb82018-01-24 11:31:01 -0500354 def __wakeup(self, future):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700355 try:
Yury Selivanova4afc482015-11-16 15:12:10 -0500356 future.result()
Yury Selivanov431b5402019-05-27 14:45:12 +0200357 except BaseException as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700358 # This may also be a cancellation.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500359 self.__step(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700360 else:
Yury Selivanova4afc482015-11-16 15:12:10 -0500361 # Don't pass the value of `future.result()` explicitly,
362 # as `Future.__iter__` and `Future.__await__` don't need it.
363 # If we call `_step(value, None)` instead of `_step()`,
364 # Python eval loop would use `.send(value)` method call,
365 # instead of `__next__()`, which is slower for futures
366 # that return non-generator iterators from their `__iter__`.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500367 self.__step()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700368 self = None # Needed to break cycles when an exception occurs.
369
370
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400371_PyTask = Task
372
373
374try:
375 import _asyncio
376except ImportError:
377 pass
378else:
379 # _CTask is needed for tests.
380 Task = _CTask = _asyncio.Task
381
382
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300383def create_task(coro, *, name=None):
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200384 """Schedule the execution of a coroutine object in a spawn task.
385
386 Return a Task object.
387 """
388 loop = events.get_running_loop()
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300389 task = loop.create_task(coro)
390 _set_task_name(task, name)
391 return task
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200392
393
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700394# wait() and as_completed() similar to those in PEP 3148.
395
396FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
397FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
398ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
399
400
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200401async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700402 """Wait for the Futures and coroutines given by fs to complete.
403
Victor Stinnerdb74d982014-06-10 11:16:05 +0200404 The sequence futures must not be empty.
405
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700406 Coroutines will be wrapped in Tasks.
407
408 Returns two sets of Future: (done, pending).
409
410 Usage:
411
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200412 done, pending = await asyncio.wait(fs)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700413
414 Note: This does not raise TimeoutError! Futures that aren't done
415 when the timeout occurs are returned in the second set.
416 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700417 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500418 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700419 if not fs:
420 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200421 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
Yury Selivanov6370f342017-12-10 18:36:12 -0500422 raise ValueError(f'Invalid return_when value: {return_when}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700423
424 if loop is None:
João Júnior558c49b2018-09-24 06:51:22 -0300425 loop = events.get_running_loop()
426 else:
Matthias Bussonnierd0ebf132019-05-20 23:20:10 -0700427 warnings.warn("The loop argument is deprecated since Python 3.8, "
428 "and scheduled for removal in Python 3.10.",
João Júnior558c49b2018-09-24 06:51:22 -0300429 DeprecationWarning, stacklevel=2)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700430
Kyle Stanley89aa7f02019-12-30 06:50:19 -0500431 if any(coroutines.iscoroutine(f) for f in set(fs)):
432 warnings.warn("The explicit passing of coroutine objects to "
433 "asyncio.wait() is deprecated since Python 3.8, and "
434 "scheduled for removal in Python 3.11.",
435 DeprecationWarning, stacklevel=2)
436
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400437 fs = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700438
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200439 return await _wait(fs, timeout, return_when, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700440
441
Victor Stinner59e08022014-08-28 11:19:25 +0200442def _release_waiter(waiter, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700443 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200444 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700445
446
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200447async def wait_for(fut, timeout, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700448 """Wait for the single Future or coroutine to complete, with timeout.
449
450 Coroutine will be wrapped in Task.
451
Victor Stinner421e49b2014-01-23 17:40:59 +0100452 Returns result of the Future or coroutine. When a timeout occurs,
453 it cancels the task and raises TimeoutError. To avoid the task
454 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700455
Victor Stinner922bc2c2015-01-15 16:29:10 +0100456 If the wait is cancelled, the task is also cancelled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700457
Victor Stinner922bc2c2015-01-15 16:29:10 +0100458 This function is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700459 """
460 if loop is None:
João Júnior558c49b2018-09-24 06:51:22 -0300461 loop = events.get_running_loop()
462 else:
Matthias Bussonnierd0ebf132019-05-20 23:20:10 -0700463 warnings.warn("The loop argument is deprecated since Python 3.8, "
464 "and scheduled for removal in Python 3.10.",
João Júnior558c49b2018-09-24 06:51:22 -0300465 DeprecationWarning, stacklevel=2)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700466
Guido van Rossum48c66c32014-01-29 14:30:38 -0800467 if timeout is None:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200468 return await fut
Guido van Rossum48c66c32014-01-29 14:30:38 -0800469
Victor K4d071892017-10-05 19:04:39 +0300470 if timeout <= 0:
471 fut = ensure_future(fut, loop=loop)
472
473 if fut.done():
474 return fut.result()
475
476 fut.cancel()
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700477 raise exceptions.TimeoutError()
Victor K4d071892017-10-05 19:04:39 +0300478
Yury Selivanov7661db62016-05-16 15:38:39 -0400479 waiter = loop.create_future()
Victor Stinner59e08022014-08-28 11:19:25 +0200480 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
481 cb = functools.partial(_release_waiter, waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700482
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400483 fut = ensure_future(fut, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700484 fut.add_done_callback(cb)
485
486 try:
Victor Stinner59e08022014-08-28 11:19:25 +0200487 # wait until the future completes or the timeout
Victor Stinner922bc2c2015-01-15 16:29:10 +0100488 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200489 await waiter
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700490 except exceptions.CancelledError:
Victor Stinner922bc2c2015-01-15 16:29:10 +0100491 fut.remove_done_callback(cb)
492 fut.cancel()
493 raise
Victor Stinner59e08022014-08-28 11:19:25 +0200494
495 if fut.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700496 return fut.result()
497 else:
498 fut.remove_done_callback(cb)
Elvis Pranskevichuse2b340a2018-05-29 17:31:01 -0400499 # We must ensure that the task is not running
500 # after wait_for() returns.
501 # See https://bugs.python.org/issue32751
502 await _cancel_and_wait(fut, loop=loop)
romasku382a5632020-05-15 23:12:05 +0300503 # In case task cancellation failed with some
504 # exception, we should re-raise it
505 # See https://bugs.python.org/issue40607
506 try:
507 fut.result()
508 except exceptions.CancelledError as exc:
509 raise exceptions.TimeoutError() from exc
510 else:
511 raise exceptions.TimeoutError()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700512 finally:
513 timeout_handle.cancel()
514
515
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200516async def _wait(fs, timeout, return_when, loop):
Elvis Pranskevichuse2b340a2018-05-29 17:31:01 -0400517 """Internal helper for wait().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700518
519 The fs argument must be a collection of Futures.
520 """
521 assert fs, 'Set of Futures is empty.'
Yury Selivanov7661db62016-05-16 15:38:39 -0400522 waiter = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700523 timeout_handle = None
524 if timeout is not None:
525 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
526 counter = len(fs)
527
528 def _on_completion(f):
529 nonlocal counter
530 counter -= 1
531 if (counter <= 0 or
532 return_when == FIRST_COMPLETED or
533 return_when == FIRST_EXCEPTION and (not f.cancelled() and
534 f.exception() is not None)):
535 if timeout_handle is not None:
536 timeout_handle.cancel()
537 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200538 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700539
540 for f in fs:
541 f.add_done_callback(_on_completion)
542
543 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200544 await waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700545 finally:
546 if timeout_handle is not None:
547 timeout_handle.cancel()
gescheitc1964e92019-05-03 18:18:02 +0300548 for f in fs:
549 f.remove_done_callback(_on_completion)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700550
551 done, pending = set(), set()
552 for f in fs:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700553 if f.done():
554 done.add(f)
555 else:
556 pending.add(f)
557 return done, pending
558
559
Elvis Pranskevichuse2b340a2018-05-29 17:31:01 -0400560async def _cancel_and_wait(fut, loop):
561 """Cancel the *fut* future or task and wait until it completes."""
562
563 waiter = loop.create_future()
564 cb = functools.partial(_release_waiter, waiter)
565 fut.add_done_callback(cb)
566
567 try:
568 fut.cancel()
569 # We cannot wait on *fut* directly to make
570 # sure _cancel_and_wait itself is reliably cancellable.
571 await waiter
572 finally:
573 fut.remove_done_callback(cb)
574
575
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700576# This is *not* a @coroutine! It is just an iterator (yielding Futures).
577def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800578 """Return an iterator whose values are coroutines.
579
580 When waiting for the yielded coroutines you'll get the results (or
581 exceptions!) of the original Futures (or coroutines), in the order
582 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700583
584 This differs from PEP 3148; the proper way to use this is:
585
586 for f in as_completed(fs):
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200587 result = await f # The 'await' may raise.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700588 # Use result.
589
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200590 If a timeout is specified, the 'await' will raise
Guido van Rossumb58f0532014-02-12 17:58:19 -0800591 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700592
593 Note: The futures 'f' are not necessarily members of fs.
594 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700595 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500596 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Andrew Svetlova4888792019-09-12 15:40:40 +0300597
Guido van Rossumb58f0532014-02-12 17:58:19 -0800598 from .queues import Queue # Import here to avoid circular import problem.
599 done = Queue(loop=loop)
Andrew Svetlova4888792019-09-12 15:40:40 +0300600
601 if loop is None:
602 loop = events.get_event_loop()
603 else:
604 warnings.warn("The loop argument is deprecated since Python 3.8, "
605 "and scheduled for removal in Python 3.10.",
606 DeprecationWarning, stacklevel=2)
607 todo = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800608 timeout_handle = None
609
610 def _on_timeout():
611 for f in todo:
612 f.remove_done_callback(_on_completion)
613 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
614 todo.clear() # Can't do todo.remove(f) in the loop.
615
616 def _on_completion(f):
617 if not todo:
618 return # _on_timeout() was here first.
619 todo.remove(f)
620 done.put_nowait(f)
621 if not todo and timeout_handle is not None:
622 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700623
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200624 async def _wait_for_one():
625 f = await done.get()
Guido van Rossumb58f0532014-02-12 17:58:19 -0800626 if f is None:
627 # Dummy value from _on_timeout().
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700628 raise exceptions.TimeoutError
Guido van Rossumb58f0532014-02-12 17:58:19 -0800629 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700630
Guido van Rossumb58f0532014-02-12 17:58:19 -0800631 for f in todo:
632 f.add_done_callback(_on_completion)
633 if todo and timeout is not None:
634 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700635 for _ in range(len(todo)):
636 yield _wait_for_one()
637
638
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200639@types.coroutine
640def __sleep0():
641 """Skip one event loop run cycle.
642
643 This is a private helper for 'asyncio.sleep()', used
644 when the 'delay' is set to 0. It uses a bare 'yield'
Yury Selivanov22feeb82018-01-24 11:31:01 -0500645 expression (which Task.__step knows how to handle)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200646 instead of creating a Future object.
647 """
648 yield
649
650
651async def sleep(delay, result=None, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700652 """Coroutine that completes after a given time (in seconds)."""
Andrew Svetlov5382c052017-12-17 16:41:30 +0200653 if delay <= 0:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200654 await __sleep0()
Yury Selivanovade04122015-11-05 14:29:04 -0500655 return result
656
Yury Selivanov7661db62016-05-16 15:38:39 -0400657 if loop is None:
João Júnior558c49b2018-09-24 06:51:22 -0300658 loop = events.get_running_loop()
659 else:
Matthias Bussonnierd0ebf132019-05-20 23:20:10 -0700660 warnings.warn("The loop argument is deprecated since Python 3.8, "
661 "and scheduled for removal in Python 3.10.",
João Júnior558c49b2018-09-24 06:51:22 -0300662 DeprecationWarning, stacklevel=2)
663
Yury Selivanov7661db62016-05-16 15:38:39 -0400664 future = loop.create_future()
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500665 h = loop.call_later(delay,
666 futures._set_result_unless_cancelled,
667 future, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700668 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200669 return await future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700670 finally:
671 h.cancel()
672
673
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400674def ensure_future(coro_or_future, *, loop=None):
Yury Selivanov620279b2015-10-02 15:00:19 -0400675 """Wrap a coroutine or an awaitable in a future.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400676
677 If the argument is a Future, it is returned directly.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700678 """
jimmylaie549c4b2018-05-28 06:42:05 -1000679 if coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200680 if loop is None:
681 loop = events.get_event_loop()
682 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200683 if task._source_traceback:
684 del task._source_traceback[-1]
685 return task
jimmylaie549c4b2018-05-28 06:42:05 -1000686 elif futures.isfuture(coro_or_future):
687 if loop is not None and loop is not futures._get_loop(coro_or_future):
Zackery Spytz4737b922019-05-03 09:35:26 -0600688 raise ValueError('The future belongs to a different loop than '
689 'the one specified as the loop argument')
jimmylaie549c4b2018-05-28 06:42:05 -1000690 return coro_or_future
Victor Stinner3f438a92017-11-28 14:43:52 +0100691 elif inspect.isawaitable(coro_or_future):
Yury Selivanov620279b2015-10-02 15:00:19 -0400692 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700693 else:
Charles Renwickae5b3262017-04-21 16:49:48 -0400694 raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
695 'required')
Yury Selivanov620279b2015-10-02 15:00:19 -0400696
697
Andrew Svetlov68b34a72019-05-16 17:52:10 +0300698@types.coroutine
Yury Selivanov620279b2015-10-02 15:00:19 -0400699def _wrap_awaitable(awaitable):
700 """Helper for asyncio.ensure_future().
701
702 Wraps awaitable (an object with __await__) into a coroutine
703 that will later be wrapped in a Task by ensure_future().
704 """
705 return (yield from awaitable.__await__())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700706
Andrew Svetlov68b34a72019-05-16 17:52:10 +0300707_wrap_awaitable._is_coroutine = _is_coroutine
708
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700709
710class _GatheringFuture(futures.Future):
711 """Helper for gather().
712
713 This overrides cancel() to cancel all the children and act more
714 like Task.cancel(), which doesn't immediately mark itself as
715 cancelled.
716 """
717
718 def __init__(self, children, *, loop=None):
719 super().__init__(loop=loop)
720 self._children = children
Yury Selivanov863b6742018-05-29 17:20:02 -0400721 self._cancel_requested = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700722
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700723 def cancel(self, msg=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700724 if self.done():
725 return False
Yury Selivanov3d676152016-10-21 17:22:17 -0400726 ret = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700727 for child in self._children:
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700728 if child.cancel(msg=msg):
Yury Selivanov3d676152016-10-21 17:22:17 -0400729 ret = True
Yury Selivanov863b6742018-05-29 17:20:02 -0400730 if ret:
731 # If any child tasks were actually cancelled, we should
732 # propagate the cancellation request regardless of
733 # *return_exceptions* argument. See issue 32684.
734 self._cancel_requested = True
Yury Selivanov3d676152016-10-21 17:22:17 -0400735 return ret
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700736
737
738def gather(*coros_or_futures, loop=None, return_exceptions=False):
Yury Selivanov36c2c042017-12-19 07:19:53 -0500739 """Return a future aggregating results from the given coroutines/futures.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700740
Guido van Rossume3c65a72016-09-30 08:17:15 -0700741 Coroutines will be wrapped in a future and scheduled in the event
742 loop. They will not necessarily be scheduled in the same order as
743 passed in.
744
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700745 All futures must share the same event loop. If all the tasks are
746 done successfully, the returned future's result is the list of
747 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500748 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700749 exceptions in the tasks are treated the same as successful
750 results, and gathered in the result list; otherwise, the first
751 raised exception will be immediately propagated to the returned
752 future.
753
754 Cancellation: if the outer Future is cancelled, all children (that
755 have not completed yet) are also cancelled. If any child is
756 cancelled, this is treated as if it raised CancelledError --
757 the outer Future is *not* cancelled in this case. (This is to
758 prevent the cancellation of one child to cause other children to
759 be cancelled.)
760 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200761 if not coros_or_futures:
Yury Selivanov7661db62016-05-16 15:38:39 -0400762 if loop is None:
763 loop = events.get_event_loop()
Andrew Svetlova4888792019-09-12 15:40:40 +0300764 else:
765 warnings.warn("The loop argument is deprecated since Python 3.8, "
766 "and scheduled for removal in Python 3.10.",
767 DeprecationWarning, stacklevel=2)
Yury Selivanov7661db62016-05-16 15:38:39 -0400768 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700769 outer.set_result([])
770 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200771
Yury Selivanov36c2c042017-12-19 07:19:53 -0500772 def _done_callback(fut):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700773 nonlocal nfinished
Yury Selivanov36c2c042017-12-19 07:19:53 -0500774 nfinished += 1
775
Victor Stinner3531d902015-01-09 01:42:52 +0100776 if outer.done():
777 if not fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700778 # Mark exception retrieved.
779 fut.exception()
780 return
Victor Stinner3531d902015-01-09 01:42:52 +0100781
Yury Selivanov36c2c042017-12-19 07:19:53 -0500782 if not return_exceptions:
783 if fut.cancelled():
784 # Check if 'fut' is cancelled first, as
785 # 'fut.exception()' will *raise* a CancelledError
786 # instead of returning it.
Chris Jerdonekda742ba2020-05-17 22:47:31 -0700787 exc = fut._make_cancelled_error()
Yury Selivanov36c2c042017-12-19 07:19:53 -0500788 outer.set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700789 return
Yury Selivanov36c2c042017-12-19 07:19:53 -0500790 else:
791 exc = fut.exception()
792 if exc is not None:
793 outer.set_exception(exc)
794 return
795
796 if nfinished == nfuts:
797 # All futures are done; create a list of results
798 # and set it to the 'outer' future.
799 results = []
800
801 for fut in children:
802 if fut.cancelled():
Chris Jerdonekda742ba2020-05-17 22:47:31 -0700803 # Check if 'fut' is cancelled first, as 'fut.exception()'
804 # will *raise* a CancelledError instead of returning it.
805 # Also, since we're adding the exception return value
806 # to 'results' instead of raising it, don't bother
807 # setting __context__. This also lets us preserve
808 # calling '_make_cancelled_error()' at most once.
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700809 res = exceptions.CancelledError(
810 '' if fut._cancel_message is None else
811 fut._cancel_message)
Yury Selivanov36c2c042017-12-19 07:19:53 -0500812 else:
813 res = fut.exception()
814 if res is None:
815 res = fut.result()
816 results.append(res)
817
Yury Selivanov863b6742018-05-29 17:20:02 -0400818 if outer._cancel_requested:
819 # If gather is being cancelled we must propagate the
820 # cancellation regardless of *return_exceptions* argument.
821 # See issue 32684.
Chris Jerdonekda742ba2020-05-17 22:47:31 -0700822 exc = fut._make_cancelled_error()
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700823 outer.set_exception(exc)
Yury Selivanov863b6742018-05-29 17:20:02 -0400824 else:
825 outer.set_result(results)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700826
Yury Selivanov36c2c042017-12-19 07:19:53 -0500827 arg_to_fut = {}
828 children = []
829 nfuts = 0
830 nfinished = 0
831 for arg in coros_or_futures:
832 if arg not in arg_to_fut:
833 fut = ensure_future(arg, loop=loop)
834 if loop is None:
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500835 loop = futures._get_loop(fut)
Yury Selivanov36c2c042017-12-19 07:19:53 -0500836 if fut is not arg:
837 # 'arg' was not a Future, therefore, 'fut' is a new
838 # Future created specifically for 'arg'. Since the caller
839 # can't control it, disable the "destroy pending task"
840 # warning.
841 fut._log_destroy_pending = False
842
843 nfuts += 1
844 arg_to_fut[arg] = fut
845 fut.add_done_callback(_done_callback)
846
847 else:
848 # There's a duplicate Future object in coros_or_futures.
849 fut = arg_to_fut[arg]
850
851 children.append(fut)
852
853 outer = _GatheringFuture(children, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700854 return outer
855
856
857def shield(arg, *, loop=None):
858 """Wait for a future, shielding it from cancellation.
859
860 The statement
861
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200862 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700863
864 is exactly equivalent to the statement
865
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200866 res = await something()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700867
868 *except* that if the coroutine containing it is cancelled, the
869 task running in something() is not cancelled. From the POV of
870 something(), the cancellation did not happen. But its caller is
871 still cancelled, so the yield-from expression still raises
872 CancelledError. Note: If something() is cancelled by other means
873 this will still cancel shield().
874
875 If you want to completely ignore cancellation (not recommended)
876 you can combine shield() with a try/except clause, as follows:
877
878 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200879 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700880 except CancelledError:
881 res = None
882 """
Andrew Svetlova4888792019-09-12 15:40:40 +0300883 if loop is not None:
884 warnings.warn("The loop argument is deprecated since Python 3.8, "
885 "and scheduled for removal in Python 3.10.",
886 DeprecationWarning, stacklevel=2)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400887 inner = ensure_future(arg, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700888 if inner.done():
889 # Shortcut.
890 return inner
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500891 loop = futures._get_loop(inner)
Yury Selivanov7661db62016-05-16 15:38:39 -0400892 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700893
Romain Picardb35acc52019-05-07 20:58:24 +0200894 def _inner_done_callback(inner):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700895 if outer.cancelled():
Victor Stinner3531d902015-01-09 01:42:52 +0100896 if not inner.cancelled():
897 # Mark inner's result as retrieved.
898 inner.exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700899 return
Victor Stinner3531d902015-01-09 01:42:52 +0100900
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700901 if inner.cancelled():
902 outer.cancel()
903 else:
904 exc = inner.exception()
905 if exc is not None:
906 outer.set_exception(exc)
907 else:
908 outer.set_result(inner.result())
909
Romain Picardb35acc52019-05-07 20:58:24 +0200910
911 def _outer_done_callback(outer):
912 if not inner.done():
913 inner.remove_done_callback(_inner_done_callback)
914
915 inner.add_done_callback(_inner_done_callback)
916 outer.add_done_callback(_outer_done_callback)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700917 return outer
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700918
919
920def run_coroutine_threadsafe(coro, loop):
921 """Submit a coroutine object to a given event loop.
922
923 Return a concurrent.futures.Future to access the result.
924 """
925 if not coroutines.iscoroutine(coro):
926 raise TypeError('A coroutine object is required')
927 future = concurrent.futures.Future()
928
929 def callback():
Guido van Rossum601953b2015-10-05 16:20:00 -0700930 try:
931 futures._chain_future(ensure_future(coro, loop=loop), future)
Yury Selivanov431b5402019-05-27 14:45:12 +0200932 except (SystemExit, KeyboardInterrupt):
933 raise
934 except BaseException as exc:
Guido van Rossum601953b2015-10-05 16:20:00 -0700935 if future.set_running_or_notify_cancel():
936 future.set_exception(exc)
937 raise
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700938
939 loop.call_soon_threadsafe(callback)
940 return future
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200941
942
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500943# WeakSet containing all alive tasks.
944_all_tasks = weakref.WeakSet()
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200945
946# Dictionary containing tasks that are currently active in
947# all running event loops. {EventLoop: Task}
948_current_tasks = {}
949
950
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500951def _register_task(task):
952 """Register a new task in asyncio as executed by loop."""
953 _all_tasks.add(task)
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200954
955
956def _enter_task(loop, task):
957 current_task = _current_tasks.get(loop)
958 if current_task is not None:
959 raise RuntimeError(f"Cannot enter into task {task!r} while another "
960 f"task {current_task!r} is being executed.")
961 _current_tasks[loop] = task
962
963
964def _leave_task(loop, task):
965 current_task = _current_tasks.get(loop)
966 if current_task is not task:
967 raise RuntimeError(f"Leaving task {task!r} does not match "
968 f"the current task {current_task!r}.")
969 del _current_tasks[loop]
970
971
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500972def _unregister_task(task):
973 """Unregister a task."""
974 _all_tasks.discard(task)
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200975
976
977_py_register_task = _register_task
978_py_unregister_task = _unregister_task
979_py_enter_task = _enter_task
980_py_leave_task = _leave_task
981
982
983try:
984 from _asyncio import (_register_task, _unregister_task,
985 _enter_task, _leave_task,
986 _all_tasks, _current_tasks)
987except ImportError:
988 pass
989else:
990 _c_register_task = _register_task
991 _c_unregister_task = _unregister_task
992 _c_enter_task = _enter_task
993 _c_leave_task = _leave_task