blob: 52f1e6629e2fc67e193b51e2497fabb4aea63dbe [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Yury Selivanov6370f342017-12-10 18:36:12 -05003__all__ = (
Andrew Svetlovf74ef452017-12-15 07:04:38 +02004 'Task', 'create_task',
Yury Selivanov6370f342017-12-10 18:36:12 -05005 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
Yury Selivanov9edad3c2017-12-11 10:03:48 -05006 'wait', 'wait_for', 'as_completed', 'sleep',
Yury Selivanov6370f342017-12-10 18:36:12 -05007 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
Andrew Svetlov44d1a592017-12-16 21:58:38 +02008 'current_task', 'all_tasks',
9 '_register_task', '_unregister_task', '_enter_task', '_leave_task',
Yury Selivanov6370f342017-12-10 18:36:12 -050010)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012import concurrent.futures
Yury Selivanovf23746a2018-01-22 19:11:18 -050013import contextvars
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014import functools
15import inspect
Alex Grönholmcca4eec2018-08-09 00:06:47 +030016import itertools
Andrew Svetlov5f841b52017-12-09 00:23:48 +020017import types
Yury Selivanov59eb9a42015-05-11 14:48:38 -040018import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019import weakref
20
Yury Selivanova0c1ba62016-10-28 12:52:37 -040021from . import base_tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020022from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070024from . import exceptions
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025from . import futures
Andrew Svetlov68b34a72019-05-16 17:52:10 +030026from .coroutines import _is_coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027
Alex Grönholmcca4eec2018-08-09 00:06:47 +030028# Helper to generate new task names
29# This uses itertools.count() instead of a "+= 1" operation because the latter
30# is not thread safe. See bpo-11866 for a longer explanation.
31_task_name_counter = itertools.count(1).__next__
32
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070033
Andrew Svetlov44d1a592017-12-16 21:58:38 +020034def current_task(loop=None):
35 """Return a currently executed task."""
36 if loop is None:
37 loop = events.get_running_loop()
38 return _current_tasks.get(loop)
39
40
41def all_tasks(loop=None):
42 """Return a set of all tasks for the loop."""
43 if loop is None:
Yury Selivanov416c1eb2018-05-28 17:54:02 -040044 loop = events.get_running_loop()
Andrew Svetlov65aa64f2019-06-11 18:27:30 +030045 # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
46 # thread while we do so. Therefore we cast it to list prior to filtering. The list
47 # cast itself requires iteration, so we repeat it several times ignoring
48 # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
49 # details.
50 i = 0
51 while True:
52 try:
53 tasks = list(_all_tasks)
54 except RuntimeError:
55 i += 1
56 if i >= 1000:
57 raise
58 else:
59 break
60 return {t for t in tasks
Yury Selivanov416c1eb2018-05-28 17:54:02 -040061 if futures._get_loop(t) is loop and not t.done()}
62
63
Alex Grönholmcca4eec2018-08-09 00:06:47 +030064def _set_task_name(task, name):
65 if name is not None:
66 try:
67 set_name = task.set_name
68 except AttributeError:
69 pass
70 else:
71 set_name(name)
72
73
Yury Selivanov0cf16f92017-12-25 10:48:15 -050074class Task(futures._PyFuture): # Inherit Python Task implementation
75 # from a Python Future implementation.
76
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070077 """A coroutine wrapped in a Future."""
78
79 # An important invariant maintained while a Task not done:
80 #
81 # - Either _fut_waiter is None, and _step() is scheduled;
82 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
83 #
84 # The only transition from the latter to the former is through
85 # _wakeup(). When _fut_waiter is not None, one of its callbacks
86 # must be _wakeup().
87
Victor Stinnerfe22e092014-12-04 23:00:13 +010088 # If False, don't log a message if the task is destroyed whereas its
89 # status is still pending
90 _log_destroy_pending = True
91
Alex Grönholmcca4eec2018-08-09 00:06:47 +030092 def __init__(self, coro, *, loop=None, name=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070093 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +020094 if self._source_traceback:
95 del self._source_traceback[-1]
Andrew Svetlovf74ef452017-12-15 07:04:38 +020096 if not coroutines.iscoroutine(coro):
97 # raise after Future.__init__(), attrs are required for __del__
98 # prevent logging for pending task in __del__
99 self._log_destroy_pending = False
100 raise TypeError(f"a coroutine was expected, got {coro!r}")
101
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300102 if name is None:
103 self._name = f'Task-{_task_name_counter()}'
104 else:
105 self._name = str(name)
106
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700107 self._must_cancel = False
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200108 self._fut_waiter = None
109 self._coro = coro
Yury Selivanovf23746a2018-01-22 19:11:18 -0500110 self._context = contextvars.copy_context()
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200111
Yury Selivanov22feeb82018-01-24 11:31:01 -0500112 self._loop.call_soon(self.__step, context=self._context)
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500113 _register_task(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700114
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900115 def __del__(self):
116 if self._state == futures._PENDING and self._log_destroy_pending:
117 context = {
118 'task': self,
119 'message': 'Task was destroyed but it is pending!',
120 }
121 if self._source_traceback:
122 context['source_traceback'] = self._source_traceback
123 self._loop.call_exception_handler(context)
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500124 super().__del__()
Victor Stinnera02f81f2014-06-24 22:37:53 +0200125
Batuhan Taşkayadec36722019-12-07 14:05:07 +0300126 def __class_getitem__(cls, type):
127 return cls
128
Victor Stinner313a9802014-07-29 12:58:23 +0200129 def _repr_info(self):
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400130 return base_tasks._task_repr_info(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700131
Alex Grönholm98ef9202019-05-30 18:30:09 +0300132 def get_coro(self):
133 return self._coro
134
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300135 def get_name(self):
136 return self._name
137
138 def set_name(self, value):
139 self._name = str(value)
140
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500141 def set_result(self, result):
142 raise RuntimeError('Task does not support set_result operation')
143
144 def set_exception(self, exception):
145 raise RuntimeError('Task does not support set_exception operation')
146
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700147 def get_stack(self, *, limit=None):
148 """Return the list of stack frames for this task's coroutine.
149
Victor Stinnerd87de832014-12-02 17:57:04 +0100150 If the coroutine is not done, this returns the stack where it is
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700151 suspended. If the coroutine has completed successfully or was
152 cancelled, this returns an empty list. If the coroutine was
153 terminated by an exception, this returns the list of traceback
154 frames.
155
156 The frames are always ordered from oldest to newest.
157
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500158 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700159 return; by default all available frames are returned. Its
160 meaning differs depending on whether a stack or a traceback is
161 returned: the newest frames of a stack are returned, but the
162 oldest frames of a traceback are returned. (This matches the
163 behavior of the traceback module.)
164
165 For reasons beyond our control, only one stack frame is
166 returned for a suspended coroutine.
167 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400168 return base_tasks._task_get_stack(self, limit)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700169
170 def print_stack(self, *, limit=None, file=None):
171 """Print the stack or traceback for this task's coroutine.
172
173 This produces output similar to that of the traceback module,
174 for the frames retrieved by get_stack(). The limit argument
175 is passed to get_stack(). The file argument is an I/O stream
R David Murray8e069d52014-09-24 13:13:45 -0400176 to which the output is written; by default output is written
177 to sys.stderr.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700178 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400179 return base_tasks._task_print_stack(self, limit, file)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700180
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700181 def cancel(self, msg=None):
R David Murray8e069d52014-09-24 13:13:45 -0400182 """Request that this task cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200183
Victor Stinner8d213572014-06-02 23:06:46 +0200184 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200185 wrapped coroutine on the next cycle through the event loop.
186 The coroutine then has a chance to clean up or even deny
187 the request using try/except/finally.
188
R David Murray8e069d52014-09-24 13:13:45 -0400189 Unlike Future.cancel, this does not guarantee that the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200190 task will be cancelled: the exception might be caught and
R David Murray8e069d52014-09-24 13:13:45 -0400191 acted upon, delaying cancellation of the task or preventing
192 cancellation completely. The task may also return a value or
193 raise a different exception.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200194
195 Immediately after this method is called, Task.cancelled() will
196 not return True (unless the task was already cancelled). A
197 task will be marked as cancelled when the wrapped coroutine
198 terminates with a CancelledError exception (even if cancel()
199 was not called).
200 """
Yury Selivanov7ce1c6f2017-06-11 13:49:18 +0000201 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700202 if self.done():
203 return False
204 if self._fut_waiter is not None:
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700205 if self._fut_waiter.cancel(msg=msg):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700206 # Leave self._fut_waiter; it may be a Task that
207 # catches and ignores the cancellation so we may have
208 # to cancel it again later.
209 return True
Yury Selivanov22feeb82018-01-24 11:31:01 -0500210 # It must be the case that self.__step is already scheduled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700211 self._must_cancel = True
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700212 self._cancel_message = msg
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700213 return True
214
Yury Selivanov22feeb82018-01-24 11:31:01 -0500215 def __step(self, exc=None):
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500216 if self.done():
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700217 raise exceptions.InvalidStateError(
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500218 f'_step(): already done: {self!r}, {exc!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700219 if self._must_cancel:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700220 if not isinstance(exc, exceptions.CancelledError):
Chris Jerdonekda742ba2020-05-17 22:47:31 -0700221 exc = self._make_cancelled_error()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700222 self._must_cancel = False
223 coro = self._coro
224 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800225
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200226 _enter_task(self._loop, self)
Yury Selivanovd59bba82015-11-20 12:41:03 -0500227 # Call either coro.throw(exc) or coro.send(None).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700228 try:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500229 if exc is None:
230 # We use the `send` method directly, because coroutines
231 # don't have `__iter__` and `__next__` methods.
232 result = coro.send(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700233 else:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500234 result = coro.throw(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700235 except StopIteration as exc:
INADA Naoki991adca2017-05-11 21:18:38 +0900236 if self._must_cancel:
237 # Task is cancelled right before coro stops.
238 self._must_cancel = False
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700239 super().cancel(msg=self._cancel_message)
INADA Naoki991adca2017-05-11 21:18:38 +0900240 else:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500241 super().set_result(exc.value)
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700242 except exceptions.CancelledError as exc:
Chris Jerdonekda742ba2020-05-17 22:47:31 -0700243 # Save the original exception so we can chain it later.
244 self._cancelled_exc = exc
245 super().cancel() # I.e., Future.cancel(self).
Yury Selivanov431b5402019-05-27 14:45:12 +0200246 except (KeyboardInterrupt, SystemExit) as exc:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500247 super().set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700248 raise
Yury Selivanov431b5402019-05-27 14:45:12 +0200249 except BaseException as exc:
250 super().set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700251 else:
Guido van Rossum1140a032016-09-09 12:54:54 -0700252 blocking = getattr(result, '_asyncio_future_blocking', None)
253 if blocking is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700254 # Yielded Future must come from Future.__iter__().
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500255 if futures._get_loop(result) is not self._loop:
Yury Selivanov6370f342017-12-10 18:36:12 -0500256 new_exc = RuntimeError(
257 f'Task {self!r} got Future '
258 f'{result!r} attached to a different loop')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500259 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500260 self.__step, new_exc, context=self._context)
Guido van Rossum1140a032016-09-09 12:54:54 -0700261 elif blocking:
Yury Selivanov4145c832016-10-09 12:19:12 -0400262 if result is self:
Yury Selivanov6370f342017-12-10 18:36:12 -0500263 new_exc = RuntimeError(
264 f'Task cannot await on itself: {self!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500265 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500266 self.__step, new_exc, context=self._context)
Yury Selivanov4145c832016-10-09 12:19:12 -0400267 else:
268 result._asyncio_future_blocking = False
Yury Selivanovf23746a2018-01-22 19:11:18 -0500269 result.add_done_callback(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500270 self.__wakeup, context=self._context)
Yury Selivanov4145c832016-10-09 12:19:12 -0400271 self._fut_waiter = result
272 if self._must_cancel:
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700273 if self._fut_waiter.cancel(
274 msg=self._cancel_message):
Yury Selivanov4145c832016-10-09 12:19:12 -0400275 self._must_cancel = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700276 else:
Yury Selivanov6370f342017-12-10 18:36:12 -0500277 new_exc = RuntimeError(
278 f'yield was used instead of yield from '
279 f'in task {self!r} with {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500280 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500281 self.__step, new_exc, context=self._context)
Yury Selivanov6370f342017-12-10 18:36:12 -0500282
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700283 elif result is None:
284 # Bare yield relinquishes control for one event loop iteration.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500285 self._loop.call_soon(self.__step, context=self._context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700286 elif inspect.isgenerator(result):
287 # Yielding a generator is just wrong.
Yury Selivanov6370f342017-12-10 18:36:12 -0500288 new_exc = RuntimeError(
289 f'yield was used instead of yield from for '
Serhiy Storchaka66553542018-05-20 16:30:31 +0300290 f'generator in task {self!r} with {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500291 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500292 self.__step, new_exc, context=self._context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700293 else:
294 # Yielding something else is an error.
Yury Selivanov6370f342017-12-10 18:36:12 -0500295 new_exc = RuntimeError(f'Task got bad yield: {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500296 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500297 self.__step, new_exc, context=self._context)
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800298 finally:
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200299 _leave_task(self._loop, self)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100300 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700301
Yury Selivanov22feeb82018-01-24 11:31:01 -0500302 def __wakeup(self, future):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700303 try:
Yury Selivanova4afc482015-11-16 15:12:10 -0500304 future.result()
Yury Selivanov431b5402019-05-27 14:45:12 +0200305 except BaseException as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700306 # This may also be a cancellation.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500307 self.__step(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700308 else:
Yury Selivanova4afc482015-11-16 15:12:10 -0500309 # Don't pass the value of `future.result()` explicitly,
310 # as `Future.__iter__` and `Future.__await__` don't need it.
311 # If we call `_step(value, None)` instead of `_step()`,
312 # Python eval loop would use `.send(value)` method call,
313 # instead of `__next__()`, which is slower for futures
314 # that return non-generator iterators from their `__iter__`.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500315 self.__step()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700316 self = None # Needed to break cycles when an exception occurs.
317
318
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400319_PyTask = Task
320
321
322try:
323 import _asyncio
324except ImportError:
325 pass
326else:
327 # _CTask is needed for tests.
328 Task = _CTask = _asyncio.Task
329
330
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300331def create_task(coro, *, name=None):
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200332 """Schedule the execution of a coroutine object in a spawn task.
333
334 Return a Task object.
335 """
336 loop = events.get_running_loop()
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300337 task = loop.create_task(coro)
338 _set_task_name(task, name)
339 return task
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200340
341
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700342# wait() and as_completed() similar to those in PEP 3148.
343
344FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
345FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
346ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
347
348
Yurii Karabase4fe3032020-11-28 10:21:17 +0200349async def wait(fs, *, timeout=None, return_when=ALL_COMPLETED):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700350 """Wait for the Futures and coroutines given by fs to complete.
351
Jakub Stasiak3d86d092020-11-02 11:56:35 +0100352 The fs iterable must not be empty.
Victor Stinnerdb74d982014-06-10 11:16:05 +0200353
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700354 Coroutines will be wrapped in Tasks.
355
356 Returns two sets of Future: (done, pending).
357
358 Usage:
359
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200360 done, pending = await asyncio.wait(fs)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700361
362 Note: This does not raise TimeoutError! Futures that aren't done
363 when the timeout occurs are returned in the second set.
364 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700365 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500366 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700367 if not fs:
368 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200369 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
Yury Selivanov6370f342017-12-10 18:36:12 -0500370 raise ValueError(f'Invalid return_when value: {return_when}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700371
Yurii Karabase4fe3032020-11-28 10:21:17 +0200372 loop = events.get_running_loop()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700373
Diogo Dutra7e5ef0a2020-11-10 19:12:52 -0300374 fs = set(fs)
375
376 if any(coroutines.iscoroutine(f) for f in fs):
Kyle Stanley89aa7f02019-12-30 06:50:19 -0500377 warnings.warn("The explicit passing of coroutine objects to "
378 "asyncio.wait() is deprecated since Python 3.8, and "
379 "scheduled for removal in Python 3.11.",
380 DeprecationWarning, stacklevel=2)
381
Diogo Dutra7e5ef0a2020-11-10 19:12:52 -0300382 fs = {ensure_future(f, loop=loop) for f in fs}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700383
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200384 return await _wait(fs, timeout, return_when, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700385
386
Victor Stinner59e08022014-08-28 11:19:25 +0200387def _release_waiter(waiter, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700388 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200389 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700390
391
Yurii Karabase4fe3032020-11-28 10:21:17 +0200392async def wait_for(fut, timeout):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700393 """Wait for the single Future or coroutine to complete, with timeout.
394
395 Coroutine will be wrapped in Task.
396
Victor Stinner421e49b2014-01-23 17:40:59 +0100397 Returns result of the Future or coroutine. When a timeout occurs,
398 it cancels the task and raises TimeoutError. To avoid the task
399 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700400
Victor Stinner922bc2c2015-01-15 16:29:10 +0100401 If the wait is cancelled, the task is also cancelled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700402
Victor Stinner922bc2c2015-01-15 16:29:10 +0100403 This function is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700404 """
Yurii Karabase4fe3032020-11-28 10:21:17 +0200405 loop = events.get_running_loop()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700406
Guido van Rossum48c66c32014-01-29 14:30:38 -0800407 if timeout is None:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200408 return await fut
Guido van Rossum48c66c32014-01-29 14:30:38 -0800409
Victor K4d071892017-10-05 19:04:39 +0300410 if timeout <= 0:
411 fut = ensure_future(fut, loop=loop)
412
413 if fut.done():
414 return fut.result()
415
Elvis Pranskevichusc517fc72020-08-26 09:42:22 -0700416 await _cancel_and_wait(fut, loop=loop)
417 try:
418 fut.result()
419 except exceptions.CancelledError as exc:
420 raise exceptions.TimeoutError() from exc
421 else:
422 raise exceptions.TimeoutError()
Victor K4d071892017-10-05 19:04:39 +0300423
Yury Selivanov7661db62016-05-16 15:38:39 -0400424 waiter = loop.create_future()
Victor Stinner59e08022014-08-28 11:19:25 +0200425 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
426 cb = functools.partial(_release_waiter, waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700427
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400428 fut = ensure_future(fut, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700429 fut.add_done_callback(cb)
430
431 try:
Victor Stinner59e08022014-08-28 11:19:25 +0200432 # wait until the future completes or the timeout
Victor Stinner922bc2c2015-01-15 16:29:10 +0100433 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200434 await waiter
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700435 except exceptions.CancelledError:
Elvis Pranskevichusa2118a12020-08-26 09:42:45 -0700436 if fut.done():
437 return fut.result()
438 else:
439 fut.remove_done_callback(cb)
Richard Kojedzinszky17ef4312020-12-18 18:26:04 +0100440 # We must ensure that the task is not running
441 # after wait_for() returns.
442 # See https://bugs.python.org/issue32751
443 await _cancel_and_wait(fut, loop=loop)
Elvis Pranskevichusa2118a12020-08-26 09:42:45 -0700444 raise
Victor Stinner59e08022014-08-28 11:19:25 +0200445
446 if fut.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700447 return fut.result()
448 else:
449 fut.remove_done_callback(cb)
Elvis Pranskevichuse2b340a2018-05-29 17:31:01 -0400450 # We must ensure that the task is not running
451 # after wait_for() returns.
452 # See https://bugs.python.org/issue32751
453 await _cancel_and_wait(fut, loop=loop)
romasku382a5632020-05-15 23:12:05 +0300454 # In case task cancellation failed with some
455 # exception, we should re-raise it
456 # See https://bugs.python.org/issue40607
457 try:
458 fut.result()
459 except exceptions.CancelledError as exc:
460 raise exceptions.TimeoutError() from exc
461 else:
462 raise exceptions.TimeoutError()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700463 finally:
464 timeout_handle.cancel()
465
466
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200467async def _wait(fs, timeout, return_when, loop):
Elvis Pranskevichuse2b340a2018-05-29 17:31:01 -0400468 """Internal helper for wait().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700469
470 The fs argument must be a collection of Futures.
471 """
472 assert fs, 'Set of Futures is empty.'
Yury Selivanov7661db62016-05-16 15:38:39 -0400473 waiter = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700474 timeout_handle = None
475 if timeout is not None:
476 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
477 counter = len(fs)
478
479 def _on_completion(f):
480 nonlocal counter
481 counter -= 1
482 if (counter <= 0 or
483 return_when == FIRST_COMPLETED or
484 return_when == FIRST_EXCEPTION and (not f.cancelled() and
485 f.exception() is not None)):
486 if timeout_handle is not None:
487 timeout_handle.cancel()
488 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200489 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700490
491 for f in fs:
492 f.add_done_callback(_on_completion)
493
494 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200495 await waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700496 finally:
497 if timeout_handle is not None:
498 timeout_handle.cancel()
gescheitc1964e92019-05-03 18:18:02 +0300499 for f in fs:
500 f.remove_done_callback(_on_completion)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700501
502 done, pending = set(), set()
503 for f in fs:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700504 if f.done():
505 done.add(f)
506 else:
507 pending.add(f)
508 return done, pending
509
510
Elvis Pranskevichuse2b340a2018-05-29 17:31:01 -0400511async def _cancel_and_wait(fut, loop):
512 """Cancel the *fut* future or task and wait until it completes."""
513
514 waiter = loop.create_future()
515 cb = functools.partial(_release_waiter, waiter)
516 fut.add_done_callback(cb)
517
518 try:
519 fut.cancel()
520 # We cannot wait on *fut* directly to make
521 # sure _cancel_and_wait itself is reliably cancellable.
522 await waiter
523 finally:
524 fut.remove_done_callback(cb)
525
526
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700527# This is *not* a @coroutine! It is just an iterator (yielding Futures).
Yurii Karabase4fe3032020-11-28 10:21:17 +0200528def as_completed(fs, *, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800529 """Return an iterator whose values are coroutines.
530
531 When waiting for the yielded coroutines you'll get the results (or
532 exceptions!) of the original Futures (or coroutines), in the order
533 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700534
535 This differs from PEP 3148; the proper way to use this is:
536
537 for f in as_completed(fs):
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200538 result = await f # The 'await' may raise.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700539 # Use result.
540
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200541 If a timeout is specified, the 'await' will raise
Guido van Rossumb58f0532014-02-12 17:58:19 -0800542 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700543
544 Note: The futures 'f' are not necessarily members of fs.
545 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700546 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Jakub Stasiak3d86d092020-11-02 11:56:35 +0100547 raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}")
Andrew Svetlova4888792019-09-12 15:40:40 +0300548
Guido van Rossumb58f0532014-02-12 17:58:19 -0800549 from .queues import Queue # Import here to avoid circular import problem.
Yurii Karabas0ec34ca2020-11-24 20:08:54 +0200550 done = Queue()
Andrew Svetlova4888792019-09-12 15:40:40 +0300551
Yurii Karabase4fe3032020-11-28 10:21:17 +0200552 loop = events.get_event_loop()
Andrew Svetlova4888792019-09-12 15:40:40 +0300553 todo = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800554 timeout_handle = None
555
556 def _on_timeout():
557 for f in todo:
558 f.remove_done_callback(_on_completion)
559 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
560 todo.clear() # Can't do todo.remove(f) in the loop.
561
562 def _on_completion(f):
563 if not todo:
564 return # _on_timeout() was here first.
565 todo.remove(f)
566 done.put_nowait(f)
567 if not todo and timeout_handle is not None:
568 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700569
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200570 async def _wait_for_one():
571 f = await done.get()
Guido van Rossumb58f0532014-02-12 17:58:19 -0800572 if f is None:
573 # Dummy value from _on_timeout().
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700574 raise exceptions.TimeoutError
Guido van Rossumb58f0532014-02-12 17:58:19 -0800575 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700576
Guido van Rossumb58f0532014-02-12 17:58:19 -0800577 for f in todo:
578 f.add_done_callback(_on_completion)
579 if todo and timeout is not None:
580 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700581 for _ in range(len(todo)):
582 yield _wait_for_one()
583
584
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200585@types.coroutine
586def __sleep0():
587 """Skip one event loop run cycle.
588
589 This is a private helper for 'asyncio.sleep()', used
590 when the 'delay' is set to 0. It uses a bare 'yield'
Yury Selivanov22feeb82018-01-24 11:31:01 -0500591 expression (which Task.__step knows how to handle)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200592 instead of creating a Future object.
593 """
594 yield
595
596
Yurii Karabase4fe3032020-11-28 10:21:17 +0200597async def sleep(delay, result=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700598 """Coroutine that completes after a given time (in seconds)."""
Andrew Svetlov5382c052017-12-17 16:41:30 +0200599 if delay <= 0:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200600 await __sleep0()
Yury Selivanovade04122015-11-05 14:29:04 -0500601 return result
602
Yurii Karabase4fe3032020-11-28 10:21:17 +0200603 loop = events.get_running_loop()
Yury Selivanov7661db62016-05-16 15:38:39 -0400604 future = loop.create_future()
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500605 h = loop.call_later(delay,
606 futures._set_result_unless_cancelled,
607 future, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700608 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200609 return await future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700610 finally:
611 h.cancel()
612
613
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400614def ensure_future(coro_or_future, *, loop=None):
Yury Selivanov620279b2015-10-02 15:00:19 -0400615 """Wrap a coroutine or an awaitable in a future.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400616
617 If the argument is a Future, it is returned directly.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700618 """
jimmylaie549c4b2018-05-28 06:42:05 -1000619 if coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200620 if loop is None:
621 loop = events.get_event_loop()
622 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200623 if task._source_traceback:
624 del task._source_traceback[-1]
625 return task
jimmylaie549c4b2018-05-28 06:42:05 -1000626 elif futures.isfuture(coro_or_future):
627 if loop is not None and loop is not futures._get_loop(coro_or_future):
Zackery Spytz4737b922019-05-03 09:35:26 -0600628 raise ValueError('The future belongs to a different loop than '
629 'the one specified as the loop argument')
jimmylaie549c4b2018-05-28 06:42:05 -1000630 return coro_or_future
Victor Stinner3f438a92017-11-28 14:43:52 +0100631 elif inspect.isawaitable(coro_or_future):
Yury Selivanov620279b2015-10-02 15:00:19 -0400632 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700633 else:
Charles Renwickae5b3262017-04-21 16:49:48 -0400634 raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
635 'required')
Yury Selivanov620279b2015-10-02 15:00:19 -0400636
637
Andrew Svetlov68b34a72019-05-16 17:52:10 +0300638@types.coroutine
Yury Selivanov620279b2015-10-02 15:00:19 -0400639def _wrap_awaitable(awaitable):
640 """Helper for asyncio.ensure_future().
641
642 Wraps awaitable (an object with __await__) into a coroutine
643 that will later be wrapped in a Task by ensure_future().
644 """
645 return (yield from awaitable.__await__())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700646
Andrew Svetlov68b34a72019-05-16 17:52:10 +0300647_wrap_awaitable._is_coroutine = _is_coroutine
648
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700649
650class _GatheringFuture(futures.Future):
651 """Helper for gather().
652
653 This overrides cancel() to cancel all the children and act more
654 like Task.cancel(), which doesn't immediately mark itself as
655 cancelled.
656 """
657
658 def __init__(self, children, *, loop=None):
659 super().__init__(loop=loop)
660 self._children = children
Yury Selivanov863b6742018-05-29 17:20:02 -0400661 self._cancel_requested = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700662
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700663 def cancel(self, msg=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700664 if self.done():
665 return False
Yury Selivanov3d676152016-10-21 17:22:17 -0400666 ret = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700667 for child in self._children:
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700668 if child.cancel(msg=msg):
Yury Selivanov3d676152016-10-21 17:22:17 -0400669 ret = True
Yury Selivanov863b6742018-05-29 17:20:02 -0400670 if ret:
671 # If any child tasks were actually cancelled, we should
672 # propagate the cancellation request regardless of
673 # *return_exceptions* argument. See issue 32684.
674 self._cancel_requested = True
Yury Selivanov3d676152016-10-21 17:22:17 -0400675 return ret
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700676
677
Yurii Karabase4fe3032020-11-28 10:21:17 +0200678def gather(*coros_or_futures, return_exceptions=False):
Yury Selivanov36c2c042017-12-19 07:19:53 -0500679 """Return a future aggregating results from the given coroutines/futures.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700680
Guido van Rossume3c65a72016-09-30 08:17:15 -0700681 Coroutines will be wrapped in a future and scheduled in the event
682 loop. They will not necessarily be scheduled in the same order as
683 passed in.
684
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700685 All futures must share the same event loop. If all the tasks are
686 done successfully, the returned future's result is the list of
687 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500688 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700689 exceptions in the tasks are treated the same as successful
690 results, and gathered in the result list; otherwise, the first
691 raised exception will be immediately propagated to the returned
692 future.
693
694 Cancellation: if the outer Future is cancelled, all children (that
695 have not completed yet) are also cancelled. If any child is
696 cancelled, this is treated as if it raised CancelledError --
697 the outer Future is *not* cancelled in this case. (This is to
698 prevent the cancellation of one child to cause other children to
699 be cancelled.)
Vinay Sharmad42528a2020-07-20 14:12:57 +0530700
701 If *return_exceptions* is False, cancelling gather() after it
702 has been marked done won't cancel any submitted awaitables.
703 For instance, gather can be marked done after propagating an
704 exception to the caller, therefore, calling ``gather.cancel()``
705 after catching an exception (raised by one of the awaitables) from
706 gather won't cancel any other awaitables.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700707 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200708 if not coros_or_futures:
Yurii Karabase4fe3032020-11-28 10:21:17 +0200709 loop = events.get_event_loop()
Yury Selivanov7661db62016-05-16 15:38:39 -0400710 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700711 outer.set_result([])
712 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200713
Yury Selivanov36c2c042017-12-19 07:19:53 -0500714 def _done_callback(fut):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700715 nonlocal nfinished
Yury Selivanov36c2c042017-12-19 07:19:53 -0500716 nfinished += 1
717
Victor Stinner3531d902015-01-09 01:42:52 +0100718 if outer.done():
719 if not fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700720 # Mark exception retrieved.
721 fut.exception()
722 return
Victor Stinner3531d902015-01-09 01:42:52 +0100723
Yury Selivanov36c2c042017-12-19 07:19:53 -0500724 if not return_exceptions:
725 if fut.cancelled():
726 # Check if 'fut' is cancelled first, as
727 # 'fut.exception()' will *raise* a CancelledError
728 # instead of returning it.
Chris Jerdonekda742ba2020-05-17 22:47:31 -0700729 exc = fut._make_cancelled_error()
Yury Selivanov36c2c042017-12-19 07:19:53 -0500730 outer.set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700731 return
Yury Selivanov36c2c042017-12-19 07:19:53 -0500732 else:
733 exc = fut.exception()
734 if exc is not None:
735 outer.set_exception(exc)
736 return
737
738 if nfinished == nfuts:
739 # All futures are done; create a list of results
740 # and set it to the 'outer' future.
741 results = []
742
743 for fut in children:
744 if fut.cancelled():
Chris Jerdonekda742ba2020-05-17 22:47:31 -0700745 # Check if 'fut' is cancelled first, as 'fut.exception()'
746 # will *raise* a CancelledError instead of returning it.
747 # Also, since we're adding the exception return value
748 # to 'results' instead of raising it, don't bother
749 # setting __context__. This also lets us preserve
750 # calling '_make_cancelled_error()' at most once.
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700751 res = exceptions.CancelledError(
752 '' if fut._cancel_message is None else
753 fut._cancel_message)
Yury Selivanov36c2c042017-12-19 07:19:53 -0500754 else:
755 res = fut.exception()
756 if res is None:
757 res = fut.result()
758 results.append(res)
759
Yury Selivanov863b6742018-05-29 17:20:02 -0400760 if outer._cancel_requested:
761 # If gather is being cancelled we must propagate the
762 # cancellation regardless of *return_exceptions* argument.
763 # See issue 32684.
Chris Jerdonekda742ba2020-05-17 22:47:31 -0700764 exc = fut._make_cancelled_error()
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700765 outer.set_exception(exc)
Yury Selivanov863b6742018-05-29 17:20:02 -0400766 else:
767 outer.set_result(results)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700768
Yury Selivanov36c2c042017-12-19 07:19:53 -0500769 arg_to_fut = {}
770 children = []
771 nfuts = 0
772 nfinished = 0
Yurii Karabase4fe3032020-11-28 10:21:17 +0200773 loop = None
Yury Selivanov36c2c042017-12-19 07:19:53 -0500774 for arg in coros_or_futures:
775 if arg not in arg_to_fut:
776 fut = ensure_future(arg, loop=loop)
777 if loop is None:
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500778 loop = futures._get_loop(fut)
Yury Selivanov36c2c042017-12-19 07:19:53 -0500779 if fut is not arg:
780 # 'arg' was not a Future, therefore, 'fut' is a new
781 # Future created specifically for 'arg'. Since the caller
782 # can't control it, disable the "destroy pending task"
783 # warning.
784 fut._log_destroy_pending = False
785
786 nfuts += 1
787 arg_to_fut[arg] = fut
788 fut.add_done_callback(_done_callback)
789
790 else:
791 # There's a duplicate Future object in coros_or_futures.
792 fut = arg_to_fut[arg]
793
794 children.append(fut)
795
796 outer = _GatheringFuture(children, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700797 return outer
798
799
Yurii Karabase4fe3032020-11-28 10:21:17 +0200800def shield(arg):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700801 """Wait for a future, shielding it from cancellation.
802
803 The statement
804
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200805 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700806
807 is exactly equivalent to the statement
808
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200809 res = await something()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700810
811 *except* that if the coroutine containing it is cancelled, the
812 task running in something() is not cancelled. From the POV of
813 something(), the cancellation did not happen. But its caller is
814 still cancelled, so the yield-from expression still raises
815 CancelledError. Note: If something() is cancelled by other means
816 this will still cancel shield().
817
818 If you want to completely ignore cancellation (not recommended)
819 you can combine shield() with a try/except clause, as follows:
820
821 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200822 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700823 except CancelledError:
824 res = None
825 """
Yurii Karabase4fe3032020-11-28 10:21:17 +0200826 inner = ensure_future(arg)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700827 if inner.done():
828 # Shortcut.
829 return inner
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500830 loop = futures._get_loop(inner)
Yury Selivanov7661db62016-05-16 15:38:39 -0400831 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700832
Romain Picardb35acc52019-05-07 20:58:24 +0200833 def _inner_done_callback(inner):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700834 if outer.cancelled():
Victor Stinner3531d902015-01-09 01:42:52 +0100835 if not inner.cancelled():
836 # Mark inner's result as retrieved.
837 inner.exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700838 return
Victor Stinner3531d902015-01-09 01:42:52 +0100839
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700840 if inner.cancelled():
841 outer.cancel()
842 else:
843 exc = inner.exception()
844 if exc is not None:
845 outer.set_exception(exc)
846 else:
847 outer.set_result(inner.result())
848
Romain Picardb35acc52019-05-07 20:58:24 +0200849
850 def _outer_done_callback(outer):
851 if not inner.done():
852 inner.remove_done_callback(_inner_done_callback)
853
854 inner.add_done_callback(_inner_done_callback)
855 outer.add_done_callback(_outer_done_callback)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700856 return outer
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700857
858
859def run_coroutine_threadsafe(coro, loop):
860 """Submit a coroutine object to a given event loop.
861
862 Return a concurrent.futures.Future to access the result.
863 """
864 if not coroutines.iscoroutine(coro):
865 raise TypeError('A coroutine object is required')
866 future = concurrent.futures.Future()
867
868 def callback():
Guido van Rossum601953b2015-10-05 16:20:00 -0700869 try:
870 futures._chain_future(ensure_future(coro, loop=loop), future)
Yury Selivanov431b5402019-05-27 14:45:12 +0200871 except (SystemExit, KeyboardInterrupt):
872 raise
873 except BaseException as exc:
Guido van Rossum601953b2015-10-05 16:20:00 -0700874 if future.set_running_or_notify_cancel():
875 future.set_exception(exc)
876 raise
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700877
878 loop.call_soon_threadsafe(callback)
879 return future
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200880
881
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500882# WeakSet containing all alive tasks.
883_all_tasks = weakref.WeakSet()
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200884
885# Dictionary containing tasks that are currently active in
886# all running event loops. {EventLoop: Task}
887_current_tasks = {}
888
889
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500890def _register_task(task):
891 """Register a new task in asyncio as executed by loop."""
892 _all_tasks.add(task)
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200893
894
895def _enter_task(loop, task):
896 current_task = _current_tasks.get(loop)
897 if current_task is not None:
898 raise RuntimeError(f"Cannot enter into task {task!r} while another "
899 f"task {current_task!r} is being executed.")
900 _current_tasks[loop] = task
901
902
903def _leave_task(loop, task):
904 current_task = _current_tasks.get(loop)
905 if current_task is not task:
906 raise RuntimeError(f"Leaving task {task!r} does not match "
907 f"the current task {current_task!r}.")
908 del _current_tasks[loop]
909
910
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500911def _unregister_task(task):
912 """Unregister a task."""
913 _all_tasks.discard(task)
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200914
915
916_py_register_task = _register_task
917_py_unregister_task = _unregister_task
918_py_enter_task = _enter_task
919_py_leave_task = _leave_task
920
921
922try:
923 from _asyncio import (_register_task, _unregister_task,
924 _enter_task, _leave_task,
925 _all_tasks, _current_tasks)
926except ImportError:
927 pass
928else:
929 _c_register_task = _register_task
930 _c_unregister_task = _unregister_task
931 _c_enter_task = _enter_task
932 _c_leave_task = _leave_task