blob: a0cb884eacab7c9486737970fb115584e72ddd11 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Yury Selivanov6370f342017-12-10 18:36:12 -05003__all__ = (
Andrew Svetlovf74ef452017-12-15 07:04:38 +02004 'Task', 'create_task',
Yury Selivanov6370f342017-12-10 18:36:12 -05005 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
Yury Selivanov9edad3c2017-12-11 10:03:48 -05006 'wait', 'wait_for', 'as_completed', 'sleep',
Yury Selivanov6370f342017-12-10 18:36:12 -05007 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
Andrew Svetlov44d1a592017-12-16 21:58:38 +02008 'current_task', 'all_tasks',
9 '_register_task', '_unregister_task', '_enter_task', '_leave_task',
Yury Selivanov6370f342017-12-10 18:36:12 -050010)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012import concurrent.futures
Yury Selivanovf23746a2018-01-22 19:11:18 -050013import contextvars
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014import functools
15import inspect
Alex Grönholmcca4eec2018-08-09 00:06:47 +030016import itertools
Andrew Svetlov5f841b52017-12-09 00:23:48 +020017import types
Yury Selivanov59eb9a42015-05-11 14:48:38 -040018import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019import weakref
20
Yury Selivanova0c1ba62016-10-28 12:52:37 -040021from . import base_tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020022from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070024from . import exceptions
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025from . import futures
Andrew Svetlov68b34a72019-05-16 17:52:10 +030026from .coroutines import _is_coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027
Alex Grönholmcca4eec2018-08-09 00:06:47 +030028# Helper to generate new task names
29# This uses itertools.count() instead of a "+= 1" operation because the latter
30# is not thread safe. See bpo-11866 for a longer explanation.
31_task_name_counter = itertools.count(1).__next__
32
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070033
Andrew Svetlov44d1a592017-12-16 21:58:38 +020034def current_task(loop=None):
35 """Return a currently executed task."""
36 if loop is None:
37 loop = events.get_running_loop()
38 return _current_tasks.get(loop)
39
40
41def all_tasks(loop=None):
42 """Return a set of all tasks for the loop."""
43 if loop is None:
Yury Selivanov416c1eb2018-05-28 17:54:02 -040044 loop = events.get_running_loop()
Andrew Svetlov65aa64f2019-06-11 18:27:30 +030045 # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
46 # thread while we do so. Therefore we cast it to list prior to filtering. The list
47 # cast itself requires iteration, so we repeat it several times ignoring
48 # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
49 # details.
50 i = 0
51 while True:
52 try:
53 tasks = list(_all_tasks)
54 except RuntimeError:
55 i += 1
56 if i >= 1000:
57 raise
58 else:
59 break
60 return {t for t in tasks
Yury Selivanov416c1eb2018-05-28 17:54:02 -040061 if futures._get_loop(t) is loop and not t.done()}
62
63
64def _all_tasks_compat(loop=None):
65 # Different from "all_task()" by returning *all* Tasks, including
66 # the completed ones. Used to implement deprecated "Tasks.all_task()"
67 # method.
68 if loop is None:
Andrew Svetlov44d1a592017-12-16 21:58:38 +020069 loop = events.get_event_loop()
Andrew Svetlov65aa64f2019-06-11 18:27:30 +030070 # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
71 # thread while we do so. Therefore we cast it to list prior to filtering. The list
72 # cast itself requires iteration, so we repeat it several times ignoring
73 # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
74 # details.
75 i = 0
76 while True:
77 try:
78 tasks = list(_all_tasks)
79 except RuntimeError:
80 i += 1
81 if i >= 1000:
82 raise
83 else:
84 break
85 return {t for t in tasks if futures._get_loop(t) is loop}
Andrew Svetlov44d1a592017-12-16 21:58:38 +020086
87
Alex Grönholmcca4eec2018-08-09 00:06:47 +030088def _set_task_name(task, name):
89 if name is not None:
90 try:
91 set_name = task.set_name
92 except AttributeError:
93 pass
94 else:
95 set_name(name)
96
97
Yury Selivanov0cf16f92017-12-25 10:48:15 -050098class Task(futures._PyFuture): # Inherit Python Task implementation
99 # from a Python Future implementation.
100
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700101 """A coroutine wrapped in a Future."""
102
103 # An important invariant maintained while a Task not done:
104 #
105 # - Either _fut_waiter is None, and _step() is scheduled;
106 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
107 #
108 # The only transition from the latter to the former is through
109 # _wakeup(). When _fut_waiter is not None, one of its callbacks
110 # must be _wakeup().
111
Victor Stinnerfe22e092014-12-04 23:00:13 +0100112 # If False, don't log a message if the task is destroyed whereas its
113 # status is still pending
114 _log_destroy_pending = True
115
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800116 @classmethod
117 def current_task(cls, loop=None):
118 """Return the currently running task in an event loop or None.
119
120 By default the current task for the current event loop is returned.
121
122 None is returned when called not in the context of a Task.
123 """
Matthias Bussonnierd0ebf132019-05-20 23:20:10 -0700124 warnings.warn("Task.current_task() is deprecated since Python 3.7, "
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200125 "use asyncio.current_task() instead",
Inada Naokic5c6cda2019-03-22 20:07:32 +0900126 DeprecationWarning,
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200127 stacklevel=2)
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800128 if loop is None:
129 loop = events.get_event_loop()
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200130 return current_task(loop)
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800131
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700132 @classmethod
133 def all_tasks(cls, loop=None):
134 """Return a set of all tasks for an event loop.
135
136 By default all tasks for the current event loop are returned.
137 """
Matthias Bussonnierd0ebf132019-05-20 23:20:10 -0700138 warnings.warn("Task.all_tasks() is deprecated since Python 3.7, "
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200139 "use asyncio.all_tasks() instead",
Inada Naokic5c6cda2019-03-22 20:07:32 +0900140 DeprecationWarning,
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200141 stacklevel=2)
Yury Selivanov416c1eb2018-05-28 17:54:02 -0400142 return _all_tasks_compat(loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700143
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300144 def __init__(self, coro, *, loop=None, name=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700145 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200146 if self._source_traceback:
147 del self._source_traceback[-1]
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200148 if not coroutines.iscoroutine(coro):
149 # raise after Future.__init__(), attrs are required for __del__
150 # prevent logging for pending task in __del__
151 self._log_destroy_pending = False
152 raise TypeError(f"a coroutine was expected, got {coro!r}")
153
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300154 if name is None:
155 self._name = f'Task-{_task_name_counter()}'
156 else:
157 self._name = str(name)
158
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700159 self._must_cancel = False
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200160 self._fut_waiter = None
161 self._coro = coro
Yury Selivanovf23746a2018-01-22 19:11:18 -0500162 self._context = contextvars.copy_context()
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200163
Yury Selivanov22feeb82018-01-24 11:31:01 -0500164 self._loop.call_soon(self.__step, context=self._context)
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500165 _register_task(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700166
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900167 def __del__(self):
168 if self._state == futures._PENDING and self._log_destroy_pending:
169 context = {
170 'task': self,
171 'message': 'Task was destroyed but it is pending!',
172 }
173 if self._source_traceback:
174 context['source_traceback'] = self._source_traceback
175 self._loop.call_exception_handler(context)
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500176 super().__del__()
Victor Stinnera02f81f2014-06-24 22:37:53 +0200177
Victor Stinner313a9802014-07-29 12:58:23 +0200178 def _repr_info(self):
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400179 return base_tasks._task_repr_info(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700180
Alex Grönholm98ef9202019-05-30 18:30:09 +0300181 def get_coro(self):
182 return self._coro
183
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300184 def get_name(self):
185 return self._name
186
187 def set_name(self, value):
188 self._name = str(value)
189
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500190 def set_result(self, result):
191 raise RuntimeError('Task does not support set_result operation')
192
193 def set_exception(self, exception):
194 raise RuntimeError('Task does not support set_exception operation')
195
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700196 def get_stack(self, *, limit=None):
197 """Return the list of stack frames for this task's coroutine.
198
Victor Stinnerd87de832014-12-02 17:57:04 +0100199 If the coroutine is not done, this returns the stack where it is
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700200 suspended. If the coroutine has completed successfully or was
201 cancelled, this returns an empty list. If the coroutine was
202 terminated by an exception, this returns the list of traceback
203 frames.
204
205 The frames are always ordered from oldest to newest.
206
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500207 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700208 return; by default all available frames are returned. Its
209 meaning differs depending on whether a stack or a traceback is
210 returned: the newest frames of a stack are returned, but the
211 oldest frames of a traceback are returned. (This matches the
212 behavior of the traceback module.)
213
214 For reasons beyond our control, only one stack frame is
215 returned for a suspended coroutine.
216 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400217 return base_tasks._task_get_stack(self, limit)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700218
219 def print_stack(self, *, limit=None, file=None):
220 """Print the stack or traceback for this task's coroutine.
221
222 This produces output similar to that of the traceback module,
223 for the frames retrieved by get_stack(). The limit argument
224 is passed to get_stack(). The file argument is an I/O stream
R David Murray8e069d52014-09-24 13:13:45 -0400225 to which the output is written; by default output is written
226 to sys.stderr.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700227 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400228 return base_tasks._task_print_stack(self, limit, file)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700229
230 def cancel(self):
R David Murray8e069d52014-09-24 13:13:45 -0400231 """Request that this task cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200232
Victor Stinner8d213572014-06-02 23:06:46 +0200233 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200234 wrapped coroutine on the next cycle through the event loop.
235 The coroutine then has a chance to clean up or even deny
236 the request using try/except/finally.
237
R David Murray8e069d52014-09-24 13:13:45 -0400238 Unlike Future.cancel, this does not guarantee that the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200239 task will be cancelled: the exception might be caught and
R David Murray8e069d52014-09-24 13:13:45 -0400240 acted upon, delaying cancellation of the task or preventing
241 cancellation completely. The task may also return a value or
242 raise a different exception.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200243
244 Immediately after this method is called, Task.cancelled() will
245 not return True (unless the task was already cancelled). A
246 task will be marked as cancelled when the wrapped coroutine
247 terminates with a CancelledError exception (even if cancel()
248 was not called).
249 """
Yury Selivanov7ce1c6f2017-06-11 13:49:18 +0000250 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700251 if self.done():
252 return False
253 if self._fut_waiter is not None:
254 if self._fut_waiter.cancel():
255 # Leave self._fut_waiter; it may be a Task that
256 # catches and ignores the cancellation so we may have
257 # to cancel it again later.
258 return True
Yury Selivanov22feeb82018-01-24 11:31:01 -0500259 # It must be the case that self.__step is already scheduled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700260 self._must_cancel = True
261 return True
262
Yury Selivanov22feeb82018-01-24 11:31:01 -0500263 def __step(self, exc=None):
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500264 if self.done():
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700265 raise exceptions.InvalidStateError(
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500266 f'_step(): already done: {self!r}, {exc!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700267 if self._must_cancel:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700268 if not isinstance(exc, exceptions.CancelledError):
269 exc = exceptions.CancelledError()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700270 self._must_cancel = False
271 coro = self._coro
272 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800273
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200274 _enter_task(self._loop, self)
Yury Selivanovd59bba82015-11-20 12:41:03 -0500275 # Call either coro.throw(exc) or coro.send(None).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700276 try:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500277 if exc is None:
278 # We use the `send` method directly, because coroutines
279 # don't have `__iter__` and `__next__` methods.
280 result = coro.send(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700281 else:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500282 result = coro.throw(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700283 except StopIteration as exc:
INADA Naoki991adca2017-05-11 21:18:38 +0900284 if self._must_cancel:
285 # Task is cancelled right before coro stops.
286 self._must_cancel = False
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700287 super().set_exception(exceptions.CancelledError())
INADA Naoki991adca2017-05-11 21:18:38 +0900288 else:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500289 super().set_result(exc.value)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700290 except exceptions.CancelledError:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700291 super().cancel() # I.e., Future.cancel(self).
Yury Selivanov431b5402019-05-27 14:45:12 +0200292 except (KeyboardInterrupt, SystemExit) as exc:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500293 super().set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700294 raise
Yury Selivanov431b5402019-05-27 14:45:12 +0200295 except BaseException as exc:
296 super().set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700297 else:
Guido van Rossum1140a032016-09-09 12:54:54 -0700298 blocking = getattr(result, '_asyncio_future_blocking', None)
299 if blocking is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700300 # Yielded Future must come from Future.__iter__().
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500301 if futures._get_loop(result) is not self._loop:
Yury Selivanov6370f342017-12-10 18:36:12 -0500302 new_exc = RuntimeError(
303 f'Task {self!r} got Future '
304 f'{result!r} attached to a different loop')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500305 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500306 self.__step, new_exc, context=self._context)
Guido van Rossum1140a032016-09-09 12:54:54 -0700307 elif blocking:
Yury Selivanov4145c832016-10-09 12:19:12 -0400308 if result is self:
Yury Selivanov6370f342017-12-10 18:36:12 -0500309 new_exc = RuntimeError(
310 f'Task cannot await on itself: {self!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500311 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500312 self.__step, new_exc, context=self._context)
Yury Selivanov4145c832016-10-09 12:19:12 -0400313 else:
314 result._asyncio_future_blocking = False
Yury Selivanovf23746a2018-01-22 19:11:18 -0500315 result.add_done_callback(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500316 self.__wakeup, context=self._context)
Yury Selivanov4145c832016-10-09 12:19:12 -0400317 self._fut_waiter = result
318 if self._must_cancel:
319 if self._fut_waiter.cancel():
320 self._must_cancel = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700321 else:
Yury Selivanov6370f342017-12-10 18:36:12 -0500322 new_exc = RuntimeError(
323 f'yield was used instead of yield from '
324 f'in task {self!r} with {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500325 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500326 self.__step, new_exc, context=self._context)
Yury Selivanov6370f342017-12-10 18:36:12 -0500327
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700328 elif result is None:
329 # Bare yield relinquishes control for one event loop iteration.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500330 self._loop.call_soon(self.__step, context=self._context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700331 elif inspect.isgenerator(result):
332 # Yielding a generator is just wrong.
Yury Selivanov6370f342017-12-10 18:36:12 -0500333 new_exc = RuntimeError(
334 f'yield was used instead of yield from for '
Serhiy Storchaka66553542018-05-20 16:30:31 +0300335 f'generator in task {self!r} with {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500336 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500337 self.__step, new_exc, context=self._context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700338 else:
339 # Yielding something else is an error.
Yury Selivanov6370f342017-12-10 18:36:12 -0500340 new_exc = RuntimeError(f'Task got bad yield: {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500341 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500342 self.__step, new_exc, context=self._context)
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800343 finally:
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200344 _leave_task(self._loop, self)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100345 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700346
Yury Selivanov22feeb82018-01-24 11:31:01 -0500347 def __wakeup(self, future):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700348 try:
Yury Selivanova4afc482015-11-16 15:12:10 -0500349 future.result()
Yury Selivanov431b5402019-05-27 14:45:12 +0200350 except BaseException as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700351 # This may also be a cancellation.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500352 self.__step(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700353 else:
Yury Selivanova4afc482015-11-16 15:12:10 -0500354 # Don't pass the value of `future.result()` explicitly,
355 # as `Future.__iter__` and `Future.__await__` don't need it.
356 # If we call `_step(value, None)` instead of `_step()`,
357 # Python eval loop would use `.send(value)` method call,
358 # instead of `__next__()`, which is slower for futures
359 # that return non-generator iterators from their `__iter__`.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500360 self.__step()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700361 self = None # Needed to break cycles when an exception occurs.
362
363
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400364_PyTask = Task
365
366
367try:
368 import _asyncio
369except ImportError:
370 pass
371else:
372 # _CTask is needed for tests.
373 Task = _CTask = _asyncio.Task
374
375
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300376def create_task(coro, *, name=None):
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200377 """Schedule the execution of a coroutine object in a spawn task.
378
379 Return a Task object.
380 """
381 loop = events.get_running_loop()
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300382 task = loop.create_task(coro)
383 _set_task_name(task, name)
384 return task
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200385
386
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700387# wait() and as_completed() similar to those in PEP 3148.
388
389FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
390FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
391ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
392
393
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200394async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700395 """Wait for the Futures and coroutines given by fs to complete.
396
Victor Stinnerdb74d982014-06-10 11:16:05 +0200397 The sequence futures must not be empty.
398
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700399 Coroutines will be wrapped in Tasks.
400
401 Returns two sets of Future: (done, pending).
402
403 Usage:
404
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200405 done, pending = await asyncio.wait(fs)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700406
407 Note: This does not raise TimeoutError! Futures that aren't done
408 when the timeout occurs are returned in the second set.
409 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700410 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500411 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700412 if not fs:
413 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200414 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
Yury Selivanov6370f342017-12-10 18:36:12 -0500415 raise ValueError(f'Invalid return_when value: {return_when}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700416
417 if loop is None:
João Júnior558c49b2018-09-24 06:51:22 -0300418 loop = events.get_running_loop()
419 else:
Matthias Bussonnierd0ebf132019-05-20 23:20:10 -0700420 warnings.warn("The loop argument is deprecated since Python 3.8, "
421 "and scheduled for removal in Python 3.10.",
João Júnior558c49b2018-09-24 06:51:22 -0300422 DeprecationWarning, stacklevel=2)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700423
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400424 fs = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700425
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200426 return await _wait(fs, timeout, return_when, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700427
428
Victor Stinner59e08022014-08-28 11:19:25 +0200429def _release_waiter(waiter, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700430 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200431 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700432
433
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200434async def wait_for(fut, timeout, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700435 """Wait for the single Future or coroutine to complete, with timeout.
436
437 Coroutine will be wrapped in Task.
438
Victor Stinner421e49b2014-01-23 17:40:59 +0100439 Returns result of the Future or coroutine. When a timeout occurs,
440 it cancels the task and raises TimeoutError. To avoid the task
441 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700442
Victor Stinner922bc2c2015-01-15 16:29:10 +0100443 If the wait is cancelled, the task is also cancelled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700444
Victor Stinner922bc2c2015-01-15 16:29:10 +0100445 This function is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700446 """
447 if loop is None:
João Júnior558c49b2018-09-24 06:51:22 -0300448 loop = events.get_running_loop()
449 else:
Matthias Bussonnierd0ebf132019-05-20 23:20:10 -0700450 warnings.warn("The loop argument is deprecated since Python 3.8, "
451 "and scheduled for removal in Python 3.10.",
João Júnior558c49b2018-09-24 06:51:22 -0300452 DeprecationWarning, stacklevel=2)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700453
Guido van Rossum48c66c32014-01-29 14:30:38 -0800454 if timeout is None:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200455 return await fut
Guido van Rossum48c66c32014-01-29 14:30:38 -0800456
Victor K4d071892017-10-05 19:04:39 +0300457 if timeout <= 0:
458 fut = ensure_future(fut, loop=loop)
459
460 if fut.done():
461 return fut.result()
462
463 fut.cancel()
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700464 raise exceptions.TimeoutError()
Victor K4d071892017-10-05 19:04:39 +0300465
Yury Selivanov7661db62016-05-16 15:38:39 -0400466 waiter = loop.create_future()
Victor Stinner59e08022014-08-28 11:19:25 +0200467 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
468 cb = functools.partial(_release_waiter, waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700469
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400470 fut = ensure_future(fut, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700471 fut.add_done_callback(cb)
472
473 try:
Victor Stinner59e08022014-08-28 11:19:25 +0200474 # wait until the future completes or the timeout
Victor Stinner922bc2c2015-01-15 16:29:10 +0100475 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200476 await waiter
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700477 except exceptions.CancelledError:
Victor Stinner922bc2c2015-01-15 16:29:10 +0100478 fut.remove_done_callback(cb)
479 fut.cancel()
480 raise
Victor Stinner59e08022014-08-28 11:19:25 +0200481
482 if fut.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700483 return fut.result()
484 else:
485 fut.remove_done_callback(cb)
Elvis Pranskevichuse2b340a2018-05-29 17:31:01 -0400486 # We must ensure that the task is not running
487 # after wait_for() returns.
488 # See https://bugs.python.org/issue32751
489 await _cancel_and_wait(fut, loop=loop)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700490 raise exceptions.TimeoutError()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700491 finally:
492 timeout_handle.cancel()
493
494
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200495async def _wait(fs, timeout, return_when, loop):
Elvis Pranskevichuse2b340a2018-05-29 17:31:01 -0400496 """Internal helper for wait().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700497
498 The fs argument must be a collection of Futures.
499 """
500 assert fs, 'Set of Futures is empty.'
Yury Selivanov7661db62016-05-16 15:38:39 -0400501 waiter = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700502 timeout_handle = None
503 if timeout is not None:
504 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
505 counter = len(fs)
506
507 def _on_completion(f):
508 nonlocal counter
509 counter -= 1
510 if (counter <= 0 or
511 return_when == FIRST_COMPLETED or
512 return_when == FIRST_EXCEPTION and (not f.cancelled() and
513 f.exception() is not None)):
514 if timeout_handle is not None:
515 timeout_handle.cancel()
516 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200517 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700518
519 for f in fs:
520 f.add_done_callback(_on_completion)
521
522 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200523 await waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700524 finally:
525 if timeout_handle is not None:
526 timeout_handle.cancel()
gescheitc1964e92019-05-03 18:18:02 +0300527 for f in fs:
528 f.remove_done_callback(_on_completion)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700529
530 done, pending = set(), set()
531 for f in fs:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700532 if f.done():
533 done.add(f)
534 else:
535 pending.add(f)
536 return done, pending
537
538
Elvis Pranskevichuse2b340a2018-05-29 17:31:01 -0400539async def _cancel_and_wait(fut, loop):
540 """Cancel the *fut* future or task and wait until it completes."""
541
542 waiter = loop.create_future()
543 cb = functools.partial(_release_waiter, waiter)
544 fut.add_done_callback(cb)
545
546 try:
547 fut.cancel()
548 # We cannot wait on *fut* directly to make
549 # sure _cancel_and_wait itself is reliably cancellable.
550 await waiter
551 finally:
552 fut.remove_done_callback(cb)
553
554
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700555# This is *not* a @coroutine! It is just an iterator (yielding Futures).
556def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800557 """Return an iterator whose values are coroutines.
558
559 When waiting for the yielded coroutines you'll get the results (or
560 exceptions!) of the original Futures (or coroutines), in the order
561 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700562
563 This differs from PEP 3148; the proper way to use this is:
564
565 for f in as_completed(fs):
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200566 result = await f # The 'await' may raise.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700567 # Use result.
568
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200569 If a timeout is specified, the 'await' will raise
Guido van Rossumb58f0532014-02-12 17:58:19 -0800570 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700571
572 Note: The futures 'f' are not necessarily members of fs.
573 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700574 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500575 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Andrew Svetlova4888792019-09-12 15:40:40 +0300576
Guido van Rossumb58f0532014-02-12 17:58:19 -0800577 from .queues import Queue # Import here to avoid circular import problem.
578 done = Queue(loop=loop)
Andrew Svetlova4888792019-09-12 15:40:40 +0300579
580 if loop is None:
581 loop = events.get_event_loop()
582 else:
583 warnings.warn("The loop argument is deprecated since Python 3.8, "
584 "and scheduled for removal in Python 3.10.",
585 DeprecationWarning, stacklevel=2)
586 todo = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800587 timeout_handle = None
588
589 def _on_timeout():
590 for f in todo:
591 f.remove_done_callback(_on_completion)
592 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
593 todo.clear() # Can't do todo.remove(f) in the loop.
594
595 def _on_completion(f):
596 if not todo:
597 return # _on_timeout() was here first.
598 todo.remove(f)
599 done.put_nowait(f)
600 if not todo and timeout_handle is not None:
601 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700602
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200603 async def _wait_for_one():
604 f = await done.get()
Guido van Rossumb58f0532014-02-12 17:58:19 -0800605 if f is None:
606 # Dummy value from _on_timeout().
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700607 raise exceptions.TimeoutError
Guido van Rossumb58f0532014-02-12 17:58:19 -0800608 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700609
Guido van Rossumb58f0532014-02-12 17:58:19 -0800610 for f in todo:
611 f.add_done_callback(_on_completion)
612 if todo and timeout is not None:
613 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700614 for _ in range(len(todo)):
615 yield _wait_for_one()
616
617
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200618@types.coroutine
619def __sleep0():
620 """Skip one event loop run cycle.
621
622 This is a private helper for 'asyncio.sleep()', used
623 when the 'delay' is set to 0. It uses a bare 'yield'
Yury Selivanov22feeb82018-01-24 11:31:01 -0500624 expression (which Task.__step knows how to handle)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200625 instead of creating a Future object.
626 """
627 yield
628
629
630async def sleep(delay, result=None, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700631 """Coroutine that completes after a given time (in seconds)."""
Andrew Svetlov5382c052017-12-17 16:41:30 +0200632 if delay <= 0:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200633 await __sleep0()
Yury Selivanovade04122015-11-05 14:29:04 -0500634 return result
635
Yury Selivanov7661db62016-05-16 15:38:39 -0400636 if loop is None:
João Júnior558c49b2018-09-24 06:51:22 -0300637 loop = events.get_running_loop()
638 else:
Matthias Bussonnierd0ebf132019-05-20 23:20:10 -0700639 warnings.warn("The loop argument is deprecated since Python 3.8, "
640 "and scheduled for removal in Python 3.10.",
João Júnior558c49b2018-09-24 06:51:22 -0300641 DeprecationWarning, stacklevel=2)
642
Yury Selivanov7661db62016-05-16 15:38:39 -0400643 future = loop.create_future()
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500644 h = loop.call_later(delay,
645 futures._set_result_unless_cancelled,
646 future, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700647 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200648 return await future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700649 finally:
650 h.cancel()
651
652
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400653def ensure_future(coro_or_future, *, loop=None):
Yury Selivanov620279b2015-10-02 15:00:19 -0400654 """Wrap a coroutine or an awaitable in a future.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400655
656 If the argument is a Future, it is returned directly.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700657 """
jimmylaie549c4b2018-05-28 06:42:05 -1000658 if coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200659 if loop is None:
660 loop = events.get_event_loop()
661 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200662 if task._source_traceback:
663 del task._source_traceback[-1]
664 return task
jimmylaie549c4b2018-05-28 06:42:05 -1000665 elif futures.isfuture(coro_or_future):
666 if loop is not None and loop is not futures._get_loop(coro_or_future):
Zackery Spytz4737b922019-05-03 09:35:26 -0600667 raise ValueError('The future belongs to a different loop than '
668 'the one specified as the loop argument')
jimmylaie549c4b2018-05-28 06:42:05 -1000669 return coro_or_future
Victor Stinner3f438a92017-11-28 14:43:52 +0100670 elif inspect.isawaitable(coro_or_future):
Yury Selivanov620279b2015-10-02 15:00:19 -0400671 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700672 else:
Charles Renwickae5b3262017-04-21 16:49:48 -0400673 raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
674 'required')
Yury Selivanov620279b2015-10-02 15:00:19 -0400675
676
Andrew Svetlov68b34a72019-05-16 17:52:10 +0300677@types.coroutine
Yury Selivanov620279b2015-10-02 15:00:19 -0400678def _wrap_awaitable(awaitable):
679 """Helper for asyncio.ensure_future().
680
681 Wraps awaitable (an object with __await__) into a coroutine
682 that will later be wrapped in a Task by ensure_future().
683 """
684 return (yield from awaitable.__await__())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700685
Andrew Svetlov68b34a72019-05-16 17:52:10 +0300686_wrap_awaitable._is_coroutine = _is_coroutine
687
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700688
689class _GatheringFuture(futures.Future):
690 """Helper for gather().
691
692 This overrides cancel() to cancel all the children and act more
693 like Task.cancel(), which doesn't immediately mark itself as
694 cancelled.
695 """
696
697 def __init__(self, children, *, loop=None):
698 super().__init__(loop=loop)
699 self._children = children
Yury Selivanov863b6742018-05-29 17:20:02 -0400700 self._cancel_requested = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700701
702 def cancel(self):
703 if self.done():
704 return False
Yury Selivanov3d676152016-10-21 17:22:17 -0400705 ret = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700706 for child in self._children:
Yury Selivanov3d676152016-10-21 17:22:17 -0400707 if child.cancel():
708 ret = True
Yury Selivanov863b6742018-05-29 17:20:02 -0400709 if ret:
710 # If any child tasks were actually cancelled, we should
711 # propagate the cancellation request regardless of
712 # *return_exceptions* argument. See issue 32684.
713 self._cancel_requested = True
Yury Selivanov3d676152016-10-21 17:22:17 -0400714 return ret
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700715
716
717def gather(*coros_or_futures, loop=None, return_exceptions=False):
Yury Selivanov36c2c042017-12-19 07:19:53 -0500718 """Return a future aggregating results from the given coroutines/futures.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700719
Guido van Rossume3c65a72016-09-30 08:17:15 -0700720 Coroutines will be wrapped in a future and scheduled in the event
721 loop. They will not necessarily be scheduled in the same order as
722 passed in.
723
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700724 All futures must share the same event loop. If all the tasks are
725 done successfully, the returned future's result is the list of
726 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500727 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700728 exceptions in the tasks are treated the same as successful
729 results, and gathered in the result list; otherwise, the first
730 raised exception will be immediately propagated to the returned
731 future.
732
733 Cancellation: if the outer Future is cancelled, all children (that
734 have not completed yet) are also cancelled. If any child is
735 cancelled, this is treated as if it raised CancelledError --
736 the outer Future is *not* cancelled in this case. (This is to
737 prevent the cancellation of one child to cause other children to
738 be cancelled.)
739 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200740 if not coros_or_futures:
Yury Selivanov7661db62016-05-16 15:38:39 -0400741 if loop is None:
742 loop = events.get_event_loop()
Andrew Svetlova4888792019-09-12 15:40:40 +0300743 else:
744 warnings.warn("The loop argument is deprecated since Python 3.8, "
745 "and scheduled for removal in Python 3.10.",
746 DeprecationWarning, stacklevel=2)
Yury Selivanov7661db62016-05-16 15:38:39 -0400747 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700748 outer.set_result([])
749 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200750
Yury Selivanov36c2c042017-12-19 07:19:53 -0500751 def _done_callback(fut):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700752 nonlocal nfinished
Yury Selivanov36c2c042017-12-19 07:19:53 -0500753 nfinished += 1
754
Victor Stinner3531d902015-01-09 01:42:52 +0100755 if outer.done():
756 if not fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700757 # Mark exception retrieved.
758 fut.exception()
759 return
Victor Stinner3531d902015-01-09 01:42:52 +0100760
Yury Selivanov36c2c042017-12-19 07:19:53 -0500761 if not return_exceptions:
762 if fut.cancelled():
763 # Check if 'fut' is cancelled first, as
764 # 'fut.exception()' will *raise* a CancelledError
765 # instead of returning it.
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700766 exc = exceptions.CancelledError()
Yury Selivanov36c2c042017-12-19 07:19:53 -0500767 outer.set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700768 return
Yury Selivanov36c2c042017-12-19 07:19:53 -0500769 else:
770 exc = fut.exception()
771 if exc is not None:
772 outer.set_exception(exc)
773 return
774
775 if nfinished == nfuts:
776 # All futures are done; create a list of results
777 # and set it to the 'outer' future.
778 results = []
779
780 for fut in children:
781 if fut.cancelled():
782 # Check if 'fut' is cancelled first, as
783 # 'fut.exception()' will *raise* a CancelledError
784 # instead of returning it.
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700785 res = exceptions.CancelledError()
Yury Selivanov36c2c042017-12-19 07:19:53 -0500786 else:
787 res = fut.exception()
788 if res is None:
789 res = fut.result()
790 results.append(res)
791
Yury Selivanov863b6742018-05-29 17:20:02 -0400792 if outer._cancel_requested:
793 # If gather is being cancelled we must propagate the
794 # cancellation regardless of *return_exceptions* argument.
795 # See issue 32684.
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700796 outer.set_exception(exceptions.CancelledError())
Yury Selivanov863b6742018-05-29 17:20:02 -0400797 else:
798 outer.set_result(results)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700799
Yury Selivanov36c2c042017-12-19 07:19:53 -0500800 arg_to_fut = {}
801 children = []
802 nfuts = 0
803 nfinished = 0
804 for arg in coros_or_futures:
805 if arg not in arg_to_fut:
806 fut = ensure_future(arg, loop=loop)
807 if loop is None:
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500808 loop = futures._get_loop(fut)
Yury Selivanov36c2c042017-12-19 07:19:53 -0500809 if fut is not arg:
810 # 'arg' was not a Future, therefore, 'fut' is a new
811 # Future created specifically for 'arg'. Since the caller
812 # can't control it, disable the "destroy pending task"
813 # warning.
814 fut._log_destroy_pending = False
815
816 nfuts += 1
817 arg_to_fut[arg] = fut
818 fut.add_done_callback(_done_callback)
819
820 else:
821 # There's a duplicate Future object in coros_or_futures.
822 fut = arg_to_fut[arg]
823
824 children.append(fut)
825
826 outer = _GatheringFuture(children, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700827 return outer
828
829
830def shield(arg, *, loop=None):
831 """Wait for a future, shielding it from cancellation.
832
833 The statement
834
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200835 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700836
837 is exactly equivalent to the statement
838
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200839 res = await something()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700840
841 *except* that if the coroutine containing it is cancelled, the
842 task running in something() is not cancelled. From the POV of
843 something(), the cancellation did not happen. But its caller is
844 still cancelled, so the yield-from expression still raises
845 CancelledError. Note: If something() is cancelled by other means
846 this will still cancel shield().
847
848 If you want to completely ignore cancellation (not recommended)
849 you can combine shield() with a try/except clause, as follows:
850
851 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200852 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700853 except CancelledError:
854 res = None
855 """
Andrew Svetlova4888792019-09-12 15:40:40 +0300856 if loop is not None:
857 warnings.warn("The loop argument is deprecated since Python 3.8, "
858 "and scheduled for removal in Python 3.10.",
859 DeprecationWarning, stacklevel=2)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400860 inner = ensure_future(arg, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700861 if inner.done():
862 # Shortcut.
863 return inner
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500864 loop = futures._get_loop(inner)
Yury Selivanov7661db62016-05-16 15:38:39 -0400865 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700866
Romain Picardb35acc52019-05-07 20:58:24 +0200867 def _inner_done_callback(inner):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700868 if outer.cancelled():
Victor Stinner3531d902015-01-09 01:42:52 +0100869 if not inner.cancelled():
870 # Mark inner's result as retrieved.
871 inner.exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700872 return
Victor Stinner3531d902015-01-09 01:42:52 +0100873
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700874 if inner.cancelled():
875 outer.cancel()
876 else:
877 exc = inner.exception()
878 if exc is not None:
879 outer.set_exception(exc)
880 else:
881 outer.set_result(inner.result())
882
Romain Picardb35acc52019-05-07 20:58:24 +0200883
884 def _outer_done_callback(outer):
885 if not inner.done():
886 inner.remove_done_callback(_inner_done_callback)
887
888 inner.add_done_callback(_inner_done_callback)
889 outer.add_done_callback(_outer_done_callback)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700890 return outer
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700891
892
893def run_coroutine_threadsafe(coro, loop):
894 """Submit a coroutine object to a given event loop.
895
896 Return a concurrent.futures.Future to access the result.
897 """
898 if not coroutines.iscoroutine(coro):
899 raise TypeError('A coroutine object is required')
900 future = concurrent.futures.Future()
901
902 def callback():
Guido van Rossum601953b2015-10-05 16:20:00 -0700903 try:
904 futures._chain_future(ensure_future(coro, loop=loop), future)
Yury Selivanov431b5402019-05-27 14:45:12 +0200905 except (SystemExit, KeyboardInterrupt):
906 raise
907 except BaseException as exc:
Guido van Rossum601953b2015-10-05 16:20:00 -0700908 if future.set_running_or_notify_cancel():
909 future.set_exception(exc)
910 raise
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700911
912 loop.call_soon_threadsafe(callback)
913 return future
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200914
915
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500916# WeakSet containing all alive tasks.
917_all_tasks = weakref.WeakSet()
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200918
919# Dictionary containing tasks that are currently active in
920# all running event loops. {EventLoop: Task}
921_current_tasks = {}
922
923
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500924def _register_task(task):
925 """Register a new task in asyncio as executed by loop."""
926 _all_tasks.add(task)
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200927
928
929def _enter_task(loop, task):
930 current_task = _current_tasks.get(loop)
931 if current_task is not None:
932 raise RuntimeError(f"Cannot enter into task {task!r} while another "
933 f"task {current_task!r} is being executed.")
934 _current_tasks[loop] = task
935
936
937def _leave_task(loop, task):
938 current_task = _current_tasks.get(loop)
939 if current_task is not task:
940 raise RuntimeError(f"Leaving task {task!r} does not match "
941 f"the current task {current_task!r}.")
942 del _current_tasks[loop]
943
944
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500945def _unregister_task(task):
946 """Unregister a task."""
947 _all_tasks.discard(task)
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200948
949
950_py_register_task = _register_task
951_py_unregister_task = _unregister_task
952_py_enter_task = _enter_task
953_py_leave_task = _leave_task
954
955
956try:
957 from _asyncio import (_register_task, _unregister_task,
958 _enter_task, _leave_task,
959 _all_tasks, _current_tasks)
960except ImportError:
961 pass
962else:
963 _c_register_task = _register_task
964 _c_unregister_task = _unregister_task
965 _c_enter_task = _enter_task
966 _c_leave_task = _leave_task