blob: 0d3a24b7853f570cb7c9b2a80413901b8a17ec81 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Yury Selivanov6370f342017-12-10 18:36:12 -05003__all__ = (
Andrew Svetlovf74ef452017-12-15 07:04:38 +02004 'Task', 'create_task',
Yury Selivanov6370f342017-12-10 18:36:12 -05005 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
Yury Selivanov9edad3c2017-12-11 10:03:48 -05006 'wait', 'wait_for', 'as_completed', 'sleep',
Yury Selivanov6370f342017-12-10 18:36:12 -05007 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
Andrew Svetlov44d1a592017-12-16 21:58:38 +02008 'current_task', 'all_tasks',
9 '_register_task', '_unregister_task', '_enter_task', '_leave_task',
Yury Selivanov6370f342017-12-10 18:36:12 -050010)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012import concurrent.futures
Yury Selivanovf23746a2018-01-22 19:11:18 -050013import contextvars
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014import functools
15import inspect
Alex Grönholmcca4eec2018-08-09 00:06:47 +030016import itertools
Andrew Svetlov5f841b52017-12-09 00:23:48 +020017import types
Yury Selivanov59eb9a42015-05-11 14:48:38 -040018import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019import weakref
20
Yury Selivanova0c1ba62016-10-28 12:52:37 -040021from . import base_tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020022from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070024from . import exceptions
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025from . import futures
Andrew Svetlov68b34a72019-05-16 17:52:10 +030026from .coroutines import _is_coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027
Alex Grönholmcca4eec2018-08-09 00:06:47 +030028# Helper to generate new task names
29# This uses itertools.count() instead of a "+= 1" operation because the latter
30# is not thread safe. See bpo-11866 for a longer explanation.
31_task_name_counter = itertools.count(1).__next__
32
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070033
Andrew Svetlov44d1a592017-12-16 21:58:38 +020034def current_task(loop=None):
35 """Return a currently executed task."""
36 if loop is None:
37 loop = events.get_running_loop()
38 return _current_tasks.get(loop)
39
40
41def all_tasks(loop=None):
42 """Return a set of all tasks for the loop."""
43 if loop is None:
Yury Selivanov416c1eb2018-05-28 17:54:02 -040044 loop = events.get_running_loop()
Andrew Svetlov65aa64f2019-06-11 18:27:30 +030045 # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
46 # thread while we do so. Therefore we cast it to list prior to filtering. The list
47 # cast itself requires iteration, so we repeat it several times ignoring
48 # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
49 # details.
50 i = 0
51 while True:
52 try:
53 tasks = list(_all_tasks)
54 except RuntimeError:
55 i += 1
56 if i >= 1000:
57 raise
58 else:
59 break
60 return {t for t in tasks
Yury Selivanov416c1eb2018-05-28 17:54:02 -040061 if futures._get_loop(t) is loop and not t.done()}
62
63
Alex Grönholmcca4eec2018-08-09 00:06:47 +030064def _set_task_name(task, name):
65 if name is not None:
66 try:
67 set_name = task.set_name
68 except AttributeError:
69 pass
70 else:
71 set_name(name)
72
73
Yury Selivanov0cf16f92017-12-25 10:48:15 -050074class Task(futures._PyFuture): # Inherit Python Task implementation
75 # from a Python Future implementation.
76
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070077 """A coroutine wrapped in a Future."""
78
79 # An important invariant maintained while a Task not done:
80 #
81 # - Either _fut_waiter is None, and _step() is scheduled;
82 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
83 #
84 # The only transition from the latter to the former is through
85 # _wakeup(). When _fut_waiter is not None, one of its callbacks
86 # must be _wakeup().
87
Victor Stinnerfe22e092014-12-04 23:00:13 +010088 # If False, don't log a message if the task is destroyed whereas its
89 # status is still pending
90 _log_destroy_pending = True
91
Alex Grönholmcca4eec2018-08-09 00:06:47 +030092 def __init__(self, coro, *, loop=None, name=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070093 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +020094 if self._source_traceback:
95 del self._source_traceback[-1]
Andrew Svetlovf74ef452017-12-15 07:04:38 +020096 if not coroutines.iscoroutine(coro):
97 # raise after Future.__init__(), attrs are required for __del__
98 # prevent logging for pending task in __del__
99 self._log_destroy_pending = False
100 raise TypeError(f"a coroutine was expected, got {coro!r}")
101
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300102 if name is None:
103 self._name = f'Task-{_task_name_counter()}'
104 else:
105 self._name = str(name)
106
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700107 self._must_cancel = False
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200108 self._fut_waiter = None
109 self._coro = coro
Yury Selivanovf23746a2018-01-22 19:11:18 -0500110 self._context = contextvars.copy_context()
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200111
Yury Selivanov22feeb82018-01-24 11:31:01 -0500112 self._loop.call_soon(self.__step, context=self._context)
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500113 _register_task(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700114
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900115 def __del__(self):
116 if self._state == futures._PENDING and self._log_destroy_pending:
117 context = {
118 'task': self,
119 'message': 'Task was destroyed but it is pending!',
120 }
121 if self._source_traceback:
122 context['source_traceback'] = self._source_traceback
123 self._loop.call_exception_handler(context)
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500124 super().__del__()
Victor Stinnera02f81f2014-06-24 22:37:53 +0200125
Batuhan Taşkayadec36722019-12-07 14:05:07 +0300126 def __class_getitem__(cls, type):
127 return cls
128
Victor Stinner313a9802014-07-29 12:58:23 +0200129 def _repr_info(self):
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400130 return base_tasks._task_repr_info(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700131
Alex Grönholm98ef9202019-05-30 18:30:09 +0300132 def get_coro(self):
133 return self._coro
134
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300135 def get_name(self):
136 return self._name
137
138 def set_name(self, value):
139 self._name = str(value)
140
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500141 def set_result(self, result):
142 raise RuntimeError('Task does not support set_result operation')
143
144 def set_exception(self, exception):
145 raise RuntimeError('Task does not support set_exception operation')
146
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700147 def get_stack(self, *, limit=None):
148 """Return the list of stack frames for this task's coroutine.
149
Victor Stinnerd87de832014-12-02 17:57:04 +0100150 If the coroutine is not done, this returns the stack where it is
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700151 suspended. If the coroutine has completed successfully or was
152 cancelled, this returns an empty list. If the coroutine was
153 terminated by an exception, this returns the list of traceback
154 frames.
155
156 The frames are always ordered from oldest to newest.
157
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500158 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700159 return; by default all available frames are returned. Its
160 meaning differs depending on whether a stack or a traceback is
161 returned: the newest frames of a stack are returned, but the
162 oldest frames of a traceback are returned. (This matches the
163 behavior of the traceback module.)
164
165 For reasons beyond our control, only one stack frame is
166 returned for a suspended coroutine.
167 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400168 return base_tasks._task_get_stack(self, limit)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700169
170 def print_stack(self, *, limit=None, file=None):
171 """Print the stack or traceback for this task's coroutine.
172
173 This produces output similar to that of the traceback module,
174 for the frames retrieved by get_stack(). The limit argument
175 is passed to get_stack(). The file argument is an I/O stream
R David Murray8e069d52014-09-24 13:13:45 -0400176 to which the output is written; by default output is written
177 to sys.stderr.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700178 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400179 return base_tasks._task_print_stack(self, limit, file)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700180
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700181 def cancel(self, msg=None):
R David Murray8e069d52014-09-24 13:13:45 -0400182 """Request that this task cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200183
Victor Stinner8d213572014-06-02 23:06:46 +0200184 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200185 wrapped coroutine on the next cycle through the event loop.
186 The coroutine then has a chance to clean up or even deny
187 the request using try/except/finally.
188
R David Murray8e069d52014-09-24 13:13:45 -0400189 Unlike Future.cancel, this does not guarantee that the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200190 task will be cancelled: the exception might be caught and
R David Murray8e069d52014-09-24 13:13:45 -0400191 acted upon, delaying cancellation of the task or preventing
192 cancellation completely. The task may also return a value or
193 raise a different exception.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200194
195 Immediately after this method is called, Task.cancelled() will
196 not return True (unless the task was already cancelled). A
197 task will be marked as cancelled when the wrapped coroutine
198 terminates with a CancelledError exception (even if cancel()
199 was not called).
200 """
Yury Selivanov7ce1c6f2017-06-11 13:49:18 +0000201 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700202 if self.done():
203 return False
204 if self._fut_waiter is not None:
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700205 if self._fut_waiter.cancel(msg=msg):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700206 # Leave self._fut_waiter; it may be a Task that
207 # catches and ignores the cancellation so we may have
208 # to cancel it again later.
209 return True
Yury Selivanov22feeb82018-01-24 11:31:01 -0500210 # It must be the case that self.__step is already scheduled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700211 self._must_cancel = True
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700212 self._cancel_message = msg
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700213 return True
214
Yury Selivanov22feeb82018-01-24 11:31:01 -0500215 def __step(self, exc=None):
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500216 if self.done():
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700217 raise exceptions.InvalidStateError(
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500218 f'_step(): already done: {self!r}, {exc!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700219 if self._must_cancel:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700220 if not isinstance(exc, exceptions.CancelledError):
Chris Jerdonekda742ba2020-05-17 22:47:31 -0700221 exc = self._make_cancelled_error()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700222 self._must_cancel = False
223 coro = self._coro
224 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800225
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200226 _enter_task(self._loop, self)
Yury Selivanovd59bba82015-11-20 12:41:03 -0500227 # Call either coro.throw(exc) or coro.send(None).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700228 try:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500229 if exc is None:
230 # We use the `send` method directly, because coroutines
231 # don't have `__iter__` and `__next__` methods.
232 result = coro.send(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700233 else:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500234 result = coro.throw(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700235 except StopIteration as exc:
INADA Naoki991adca2017-05-11 21:18:38 +0900236 if self._must_cancel:
237 # Task is cancelled right before coro stops.
238 self._must_cancel = False
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700239 super().cancel(msg=self._cancel_message)
INADA Naoki991adca2017-05-11 21:18:38 +0900240 else:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500241 super().set_result(exc.value)
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700242 except exceptions.CancelledError as exc:
Chris Jerdonekda742ba2020-05-17 22:47:31 -0700243 # Save the original exception so we can chain it later.
244 self._cancelled_exc = exc
245 super().cancel() # I.e., Future.cancel(self).
Yury Selivanov431b5402019-05-27 14:45:12 +0200246 except (KeyboardInterrupt, SystemExit) as exc:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500247 super().set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700248 raise
Yury Selivanov431b5402019-05-27 14:45:12 +0200249 except BaseException as exc:
250 super().set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700251 else:
Guido van Rossum1140a032016-09-09 12:54:54 -0700252 blocking = getattr(result, '_asyncio_future_blocking', None)
253 if blocking is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700254 # Yielded Future must come from Future.__iter__().
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500255 if futures._get_loop(result) is not self._loop:
Yury Selivanov6370f342017-12-10 18:36:12 -0500256 new_exc = RuntimeError(
257 f'Task {self!r} got Future '
258 f'{result!r} attached to a different loop')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500259 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500260 self.__step, new_exc, context=self._context)
Guido van Rossum1140a032016-09-09 12:54:54 -0700261 elif blocking:
Yury Selivanov4145c832016-10-09 12:19:12 -0400262 if result is self:
Yury Selivanov6370f342017-12-10 18:36:12 -0500263 new_exc = RuntimeError(
264 f'Task cannot await on itself: {self!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500265 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500266 self.__step, new_exc, context=self._context)
Yury Selivanov4145c832016-10-09 12:19:12 -0400267 else:
268 result._asyncio_future_blocking = False
Yury Selivanovf23746a2018-01-22 19:11:18 -0500269 result.add_done_callback(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500270 self.__wakeup, context=self._context)
Yury Selivanov4145c832016-10-09 12:19:12 -0400271 self._fut_waiter = result
272 if self._must_cancel:
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700273 if self._fut_waiter.cancel(
274 msg=self._cancel_message):
Yury Selivanov4145c832016-10-09 12:19:12 -0400275 self._must_cancel = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700276 else:
Yury Selivanov6370f342017-12-10 18:36:12 -0500277 new_exc = RuntimeError(
278 f'yield was used instead of yield from '
279 f'in task {self!r} with {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500280 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500281 self.__step, new_exc, context=self._context)
Yury Selivanov6370f342017-12-10 18:36:12 -0500282
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700283 elif result is None:
284 # Bare yield relinquishes control for one event loop iteration.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500285 self._loop.call_soon(self.__step, context=self._context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700286 elif inspect.isgenerator(result):
287 # Yielding a generator is just wrong.
Yury Selivanov6370f342017-12-10 18:36:12 -0500288 new_exc = RuntimeError(
289 f'yield was used instead of yield from for '
Serhiy Storchaka66553542018-05-20 16:30:31 +0300290 f'generator in task {self!r} with {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500291 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500292 self.__step, new_exc, context=self._context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700293 else:
294 # Yielding something else is an error.
Yury Selivanov6370f342017-12-10 18:36:12 -0500295 new_exc = RuntimeError(f'Task got bad yield: {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500296 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500297 self.__step, new_exc, context=self._context)
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800298 finally:
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200299 _leave_task(self._loop, self)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100300 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700301
Yury Selivanov22feeb82018-01-24 11:31:01 -0500302 def __wakeup(self, future):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700303 try:
Yury Selivanova4afc482015-11-16 15:12:10 -0500304 future.result()
Yury Selivanov431b5402019-05-27 14:45:12 +0200305 except BaseException as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700306 # This may also be a cancellation.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500307 self.__step(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700308 else:
Yury Selivanova4afc482015-11-16 15:12:10 -0500309 # Don't pass the value of `future.result()` explicitly,
310 # as `Future.__iter__` and `Future.__await__` don't need it.
311 # If we call `_step(value, None)` instead of `_step()`,
312 # Python eval loop would use `.send(value)` method call,
313 # instead of `__next__()`, which is slower for futures
314 # that return non-generator iterators from their `__iter__`.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500315 self.__step()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700316 self = None # Needed to break cycles when an exception occurs.
317
318
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400319_PyTask = Task
320
321
322try:
323 import _asyncio
324except ImportError:
325 pass
326else:
327 # _CTask is needed for tests.
328 Task = _CTask = _asyncio.Task
329
330
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300331def create_task(coro, *, name=None):
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200332 """Schedule the execution of a coroutine object in a spawn task.
333
334 Return a Task object.
335 """
336 loop = events.get_running_loop()
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300337 task = loop.create_task(coro)
338 _set_task_name(task, name)
339 return task
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200340
341
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700342# wait() and as_completed() similar to those in PEP 3148.
343
344FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
345FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
346ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
347
348
Yurii Karabase4fe3032020-11-28 10:21:17 +0200349async def wait(fs, *, timeout=None, return_when=ALL_COMPLETED):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700350 """Wait for the Futures and coroutines given by fs to complete.
351
Jakub Stasiak3d86d092020-11-02 11:56:35 +0100352 The fs iterable must not be empty.
Victor Stinnerdb74d982014-06-10 11:16:05 +0200353
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700354 Coroutines will be wrapped in Tasks.
355
356 Returns two sets of Future: (done, pending).
357
358 Usage:
359
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200360 done, pending = await asyncio.wait(fs)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700361
362 Note: This does not raise TimeoutError! Futures that aren't done
363 when the timeout occurs are returned in the second set.
364 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700365 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500366 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700367 if not fs:
368 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200369 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
Yury Selivanov6370f342017-12-10 18:36:12 -0500370 raise ValueError(f'Invalid return_when value: {return_when}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700371
Yurii Karabase4fe3032020-11-28 10:21:17 +0200372 loop = events.get_running_loop()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700373
Diogo Dutra7e5ef0a2020-11-10 19:12:52 -0300374 fs = set(fs)
375
376 if any(coroutines.iscoroutine(f) for f in fs):
Kyle Stanley89aa7f02019-12-30 06:50:19 -0500377 warnings.warn("The explicit passing of coroutine objects to "
378 "asyncio.wait() is deprecated since Python 3.8, and "
379 "scheduled for removal in Python 3.11.",
380 DeprecationWarning, stacklevel=2)
381
Diogo Dutra7e5ef0a2020-11-10 19:12:52 -0300382 fs = {ensure_future(f, loop=loop) for f in fs}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700383
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200384 return await _wait(fs, timeout, return_when, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700385
386
Victor Stinner59e08022014-08-28 11:19:25 +0200387def _release_waiter(waiter, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700388 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200389 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700390
391
Yurii Karabase4fe3032020-11-28 10:21:17 +0200392async def wait_for(fut, timeout):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700393 """Wait for the single Future or coroutine to complete, with timeout.
394
395 Coroutine will be wrapped in Task.
396
Victor Stinner421e49b2014-01-23 17:40:59 +0100397 Returns result of the Future or coroutine. When a timeout occurs,
398 it cancels the task and raises TimeoutError. To avoid the task
399 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700400
Victor Stinner922bc2c2015-01-15 16:29:10 +0100401 If the wait is cancelled, the task is also cancelled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700402
Victor Stinner922bc2c2015-01-15 16:29:10 +0100403 This function is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700404 """
Yurii Karabase4fe3032020-11-28 10:21:17 +0200405 loop = events.get_running_loop()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700406
Guido van Rossum48c66c32014-01-29 14:30:38 -0800407 if timeout is None:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200408 return await fut
Guido van Rossum48c66c32014-01-29 14:30:38 -0800409
Victor K4d071892017-10-05 19:04:39 +0300410 if timeout <= 0:
411 fut = ensure_future(fut, loop=loop)
412
413 if fut.done():
414 return fut.result()
415
Elvis Pranskevichusc517fc72020-08-26 09:42:22 -0700416 await _cancel_and_wait(fut, loop=loop)
417 try:
418 fut.result()
419 except exceptions.CancelledError as exc:
420 raise exceptions.TimeoutError() from exc
421 else:
422 raise exceptions.TimeoutError()
Victor K4d071892017-10-05 19:04:39 +0300423
Yury Selivanov7661db62016-05-16 15:38:39 -0400424 waiter = loop.create_future()
Victor Stinner59e08022014-08-28 11:19:25 +0200425 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
426 cb = functools.partial(_release_waiter, waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700427
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400428 fut = ensure_future(fut, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700429 fut.add_done_callback(cb)
430
431 try:
Victor Stinner59e08022014-08-28 11:19:25 +0200432 # wait until the future completes or the timeout
Victor Stinner922bc2c2015-01-15 16:29:10 +0100433 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200434 await waiter
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700435 except exceptions.CancelledError:
Elvis Pranskevichusa2118a12020-08-26 09:42:45 -0700436 if fut.done():
437 return fut.result()
438 else:
439 fut.remove_done_callback(cb)
440 fut.cancel()
441 raise
Victor Stinner59e08022014-08-28 11:19:25 +0200442
443 if fut.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700444 return fut.result()
445 else:
446 fut.remove_done_callback(cb)
Elvis Pranskevichuse2b340a2018-05-29 17:31:01 -0400447 # We must ensure that the task is not running
448 # after wait_for() returns.
449 # See https://bugs.python.org/issue32751
450 await _cancel_and_wait(fut, loop=loop)
romasku382a5632020-05-15 23:12:05 +0300451 # In case task cancellation failed with some
452 # exception, we should re-raise it
453 # See https://bugs.python.org/issue40607
454 try:
455 fut.result()
456 except exceptions.CancelledError as exc:
457 raise exceptions.TimeoutError() from exc
458 else:
459 raise exceptions.TimeoutError()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700460 finally:
461 timeout_handle.cancel()
462
463
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200464async def _wait(fs, timeout, return_when, loop):
Elvis Pranskevichuse2b340a2018-05-29 17:31:01 -0400465 """Internal helper for wait().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700466
467 The fs argument must be a collection of Futures.
468 """
469 assert fs, 'Set of Futures is empty.'
Yury Selivanov7661db62016-05-16 15:38:39 -0400470 waiter = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700471 timeout_handle = None
472 if timeout is not None:
473 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
474 counter = len(fs)
475
476 def _on_completion(f):
477 nonlocal counter
478 counter -= 1
479 if (counter <= 0 or
480 return_when == FIRST_COMPLETED or
481 return_when == FIRST_EXCEPTION and (not f.cancelled() and
482 f.exception() is not None)):
483 if timeout_handle is not None:
484 timeout_handle.cancel()
485 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200486 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700487
488 for f in fs:
489 f.add_done_callback(_on_completion)
490
491 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200492 await waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700493 finally:
494 if timeout_handle is not None:
495 timeout_handle.cancel()
gescheitc1964e92019-05-03 18:18:02 +0300496 for f in fs:
497 f.remove_done_callback(_on_completion)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700498
499 done, pending = set(), set()
500 for f in fs:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700501 if f.done():
502 done.add(f)
503 else:
504 pending.add(f)
505 return done, pending
506
507
Elvis Pranskevichuse2b340a2018-05-29 17:31:01 -0400508async def _cancel_and_wait(fut, loop):
509 """Cancel the *fut* future or task and wait until it completes."""
510
511 waiter = loop.create_future()
512 cb = functools.partial(_release_waiter, waiter)
513 fut.add_done_callback(cb)
514
515 try:
516 fut.cancel()
517 # We cannot wait on *fut* directly to make
518 # sure _cancel_and_wait itself is reliably cancellable.
519 await waiter
520 finally:
521 fut.remove_done_callback(cb)
522
523
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700524# This is *not* a @coroutine! It is just an iterator (yielding Futures).
Yurii Karabase4fe3032020-11-28 10:21:17 +0200525def as_completed(fs, *, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800526 """Return an iterator whose values are coroutines.
527
528 When waiting for the yielded coroutines you'll get the results (or
529 exceptions!) of the original Futures (or coroutines), in the order
530 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700531
532 This differs from PEP 3148; the proper way to use this is:
533
534 for f in as_completed(fs):
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200535 result = await f # The 'await' may raise.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700536 # Use result.
537
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200538 If a timeout is specified, the 'await' will raise
Guido van Rossumb58f0532014-02-12 17:58:19 -0800539 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700540
541 Note: The futures 'f' are not necessarily members of fs.
542 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700543 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Jakub Stasiak3d86d092020-11-02 11:56:35 +0100544 raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}")
Andrew Svetlova4888792019-09-12 15:40:40 +0300545
Guido van Rossumb58f0532014-02-12 17:58:19 -0800546 from .queues import Queue # Import here to avoid circular import problem.
Yurii Karabas0ec34ca2020-11-24 20:08:54 +0200547 done = Queue()
Andrew Svetlova4888792019-09-12 15:40:40 +0300548
Yurii Karabase4fe3032020-11-28 10:21:17 +0200549 loop = events.get_event_loop()
Andrew Svetlova4888792019-09-12 15:40:40 +0300550 todo = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800551 timeout_handle = None
552
553 def _on_timeout():
554 for f in todo:
555 f.remove_done_callback(_on_completion)
556 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
557 todo.clear() # Can't do todo.remove(f) in the loop.
558
559 def _on_completion(f):
560 if not todo:
561 return # _on_timeout() was here first.
562 todo.remove(f)
563 done.put_nowait(f)
564 if not todo and timeout_handle is not None:
565 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700566
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200567 async def _wait_for_one():
568 f = await done.get()
Guido van Rossumb58f0532014-02-12 17:58:19 -0800569 if f is None:
570 # Dummy value from _on_timeout().
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700571 raise exceptions.TimeoutError
Guido van Rossumb58f0532014-02-12 17:58:19 -0800572 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700573
Guido van Rossumb58f0532014-02-12 17:58:19 -0800574 for f in todo:
575 f.add_done_callback(_on_completion)
576 if todo and timeout is not None:
577 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700578 for _ in range(len(todo)):
579 yield _wait_for_one()
580
581
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200582@types.coroutine
583def __sleep0():
584 """Skip one event loop run cycle.
585
586 This is a private helper for 'asyncio.sleep()', used
587 when the 'delay' is set to 0. It uses a bare 'yield'
Yury Selivanov22feeb82018-01-24 11:31:01 -0500588 expression (which Task.__step knows how to handle)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200589 instead of creating a Future object.
590 """
591 yield
592
593
Yurii Karabase4fe3032020-11-28 10:21:17 +0200594async def sleep(delay, result=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700595 """Coroutine that completes after a given time (in seconds)."""
Andrew Svetlov5382c052017-12-17 16:41:30 +0200596 if delay <= 0:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200597 await __sleep0()
Yury Selivanovade04122015-11-05 14:29:04 -0500598 return result
599
Yurii Karabase4fe3032020-11-28 10:21:17 +0200600 loop = events.get_running_loop()
Yury Selivanov7661db62016-05-16 15:38:39 -0400601 future = loop.create_future()
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500602 h = loop.call_later(delay,
603 futures._set_result_unless_cancelled,
604 future, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700605 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200606 return await future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700607 finally:
608 h.cancel()
609
610
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400611def ensure_future(coro_or_future, *, loop=None):
Yury Selivanov620279b2015-10-02 15:00:19 -0400612 """Wrap a coroutine or an awaitable in a future.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400613
614 If the argument is a Future, it is returned directly.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700615 """
jimmylaie549c4b2018-05-28 06:42:05 -1000616 if coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200617 if loop is None:
618 loop = events.get_event_loop()
619 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200620 if task._source_traceback:
621 del task._source_traceback[-1]
622 return task
jimmylaie549c4b2018-05-28 06:42:05 -1000623 elif futures.isfuture(coro_or_future):
624 if loop is not None and loop is not futures._get_loop(coro_or_future):
Zackery Spytz4737b922019-05-03 09:35:26 -0600625 raise ValueError('The future belongs to a different loop than '
626 'the one specified as the loop argument')
jimmylaie549c4b2018-05-28 06:42:05 -1000627 return coro_or_future
Victor Stinner3f438a92017-11-28 14:43:52 +0100628 elif inspect.isawaitable(coro_or_future):
Yury Selivanov620279b2015-10-02 15:00:19 -0400629 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700630 else:
Charles Renwickae5b3262017-04-21 16:49:48 -0400631 raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
632 'required')
Yury Selivanov620279b2015-10-02 15:00:19 -0400633
634
Andrew Svetlov68b34a72019-05-16 17:52:10 +0300635@types.coroutine
Yury Selivanov620279b2015-10-02 15:00:19 -0400636def _wrap_awaitable(awaitable):
637 """Helper for asyncio.ensure_future().
638
639 Wraps awaitable (an object with __await__) into a coroutine
640 that will later be wrapped in a Task by ensure_future().
641 """
642 return (yield from awaitable.__await__())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700643
Andrew Svetlov68b34a72019-05-16 17:52:10 +0300644_wrap_awaitable._is_coroutine = _is_coroutine
645
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700646
647class _GatheringFuture(futures.Future):
648 """Helper for gather().
649
650 This overrides cancel() to cancel all the children and act more
651 like Task.cancel(), which doesn't immediately mark itself as
652 cancelled.
653 """
654
655 def __init__(self, children, *, loop=None):
656 super().__init__(loop=loop)
657 self._children = children
Yury Selivanov863b6742018-05-29 17:20:02 -0400658 self._cancel_requested = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700659
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700660 def cancel(self, msg=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700661 if self.done():
662 return False
Yury Selivanov3d676152016-10-21 17:22:17 -0400663 ret = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700664 for child in self._children:
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700665 if child.cancel(msg=msg):
Yury Selivanov3d676152016-10-21 17:22:17 -0400666 ret = True
Yury Selivanov863b6742018-05-29 17:20:02 -0400667 if ret:
668 # If any child tasks were actually cancelled, we should
669 # propagate the cancellation request regardless of
670 # *return_exceptions* argument. See issue 32684.
671 self._cancel_requested = True
Yury Selivanov3d676152016-10-21 17:22:17 -0400672 return ret
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700673
674
Yurii Karabase4fe3032020-11-28 10:21:17 +0200675def gather(*coros_or_futures, return_exceptions=False):
Yury Selivanov36c2c042017-12-19 07:19:53 -0500676 """Return a future aggregating results from the given coroutines/futures.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700677
Guido van Rossume3c65a72016-09-30 08:17:15 -0700678 Coroutines will be wrapped in a future and scheduled in the event
679 loop. They will not necessarily be scheduled in the same order as
680 passed in.
681
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700682 All futures must share the same event loop. If all the tasks are
683 done successfully, the returned future's result is the list of
684 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500685 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700686 exceptions in the tasks are treated the same as successful
687 results, and gathered in the result list; otherwise, the first
688 raised exception will be immediately propagated to the returned
689 future.
690
691 Cancellation: if the outer Future is cancelled, all children (that
692 have not completed yet) are also cancelled. If any child is
693 cancelled, this is treated as if it raised CancelledError --
694 the outer Future is *not* cancelled in this case. (This is to
695 prevent the cancellation of one child to cause other children to
696 be cancelled.)
Vinay Sharmad42528a2020-07-20 14:12:57 +0530697
698 If *return_exceptions* is False, cancelling gather() after it
699 has been marked done won't cancel any submitted awaitables.
700 For instance, gather can be marked done after propagating an
701 exception to the caller, therefore, calling ``gather.cancel()``
702 after catching an exception (raised by one of the awaitables) from
703 gather won't cancel any other awaitables.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700704 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200705 if not coros_or_futures:
Yurii Karabase4fe3032020-11-28 10:21:17 +0200706 loop = events.get_event_loop()
Yury Selivanov7661db62016-05-16 15:38:39 -0400707 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700708 outer.set_result([])
709 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200710
Yury Selivanov36c2c042017-12-19 07:19:53 -0500711 def _done_callback(fut):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700712 nonlocal nfinished
Yury Selivanov36c2c042017-12-19 07:19:53 -0500713 nfinished += 1
714
Victor Stinner3531d902015-01-09 01:42:52 +0100715 if outer.done():
716 if not fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700717 # Mark exception retrieved.
718 fut.exception()
719 return
Victor Stinner3531d902015-01-09 01:42:52 +0100720
Yury Selivanov36c2c042017-12-19 07:19:53 -0500721 if not return_exceptions:
722 if fut.cancelled():
723 # Check if 'fut' is cancelled first, as
724 # 'fut.exception()' will *raise* a CancelledError
725 # instead of returning it.
Chris Jerdonekda742ba2020-05-17 22:47:31 -0700726 exc = fut._make_cancelled_error()
Yury Selivanov36c2c042017-12-19 07:19:53 -0500727 outer.set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700728 return
Yury Selivanov36c2c042017-12-19 07:19:53 -0500729 else:
730 exc = fut.exception()
731 if exc is not None:
732 outer.set_exception(exc)
733 return
734
735 if nfinished == nfuts:
736 # All futures are done; create a list of results
737 # and set it to the 'outer' future.
738 results = []
739
740 for fut in children:
741 if fut.cancelled():
Chris Jerdonekda742ba2020-05-17 22:47:31 -0700742 # Check if 'fut' is cancelled first, as 'fut.exception()'
743 # will *raise* a CancelledError instead of returning it.
744 # Also, since we're adding the exception return value
745 # to 'results' instead of raising it, don't bother
746 # setting __context__. This also lets us preserve
747 # calling '_make_cancelled_error()' at most once.
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700748 res = exceptions.CancelledError(
749 '' if fut._cancel_message is None else
750 fut._cancel_message)
Yury Selivanov36c2c042017-12-19 07:19:53 -0500751 else:
752 res = fut.exception()
753 if res is None:
754 res = fut.result()
755 results.append(res)
756
Yury Selivanov863b6742018-05-29 17:20:02 -0400757 if outer._cancel_requested:
758 # If gather is being cancelled we must propagate the
759 # cancellation regardless of *return_exceptions* argument.
760 # See issue 32684.
Chris Jerdonekda742ba2020-05-17 22:47:31 -0700761 exc = fut._make_cancelled_error()
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700762 outer.set_exception(exc)
Yury Selivanov863b6742018-05-29 17:20:02 -0400763 else:
764 outer.set_result(results)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700765
Yury Selivanov36c2c042017-12-19 07:19:53 -0500766 arg_to_fut = {}
767 children = []
768 nfuts = 0
769 nfinished = 0
Yurii Karabase4fe3032020-11-28 10:21:17 +0200770 loop = None
Yury Selivanov36c2c042017-12-19 07:19:53 -0500771 for arg in coros_or_futures:
772 if arg not in arg_to_fut:
773 fut = ensure_future(arg, loop=loop)
774 if loop is None:
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500775 loop = futures._get_loop(fut)
Yury Selivanov36c2c042017-12-19 07:19:53 -0500776 if fut is not arg:
777 # 'arg' was not a Future, therefore, 'fut' is a new
778 # Future created specifically for 'arg'. Since the caller
779 # can't control it, disable the "destroy pending task"
780 # warning.
781 fut._log_destroy_pending = False
782
783 nfuts += 1
784 arg_to_fut[arg] = fut
785 fut.add_done_callback(_done_callback)
786
787 else:
788 # There's a duplicate Future object in coros_or_futures.
789 fut = arg_to_fut[arg]
790
791 children.append(fut)
792
793 outer = _GatheringFuture(children, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700794 return outer
795
796
Yurii Karabase4fe3032020-11-28 10:21:17 +0200797def shield(arg):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700798 """Wait for a future, shielding it from cancellation.
799
800 The statement
801
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200802 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700803
804 is exactly equivalent to the statement
805
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200806 res = await something()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700807
808 *except* that if the coroutine containing it is cancelled, the
809 task running in something() is not cancelled. From the POV of
810 something(), the cancellation did not happen. But its caller is
811 still cancelled, so the yield-from expression still raises
812 CancelledError. Note: If something() is cancelled by other means
813 this will still cancel shield().
814
815 If you want to completely ignore cancellation (not recommended)
816 you can combine shield() with a try/except clause, as follows:
817
818 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200819 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700820 except CancelledError:
821 res = None
822 """
Yurii Karabase4fe3032020-11-28 10:21:17 +0200823 inner = ensure_future(arg)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700824 if inner.done():
825 # Shortcut.
826 return inner
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500827 loop = futures._get_loop(inner)
Yury Selivanov7661db62016-05-16 15:38:39 -0400828 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700829
Romain Picardb35acc52019-05-07 20:58:24 +0200830 def _inner_done_callback(inner):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700831 if outer.cancelled():
Victor Stinner3531d902015-01-09 01:42:52 +0100832 if not inner.cancelled():
833 # Mark inner's result as retrieved.
834 inner.exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700835 return
Victor Stinner3531d902015-01-09 01:42:52 +0100836
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700837 if inner.cancelled():
838 outer.cancel()
839 else:
840 exc = inner.exception()
841 if exc is not None:
842 outer.set_exception(exc)
843 else:
844 outer.set_result(inner.result())
845
Romain Picardb35acc52019-05-07 20:58:24 +0200846
847 def _outer_done_callback(outer):
848 if not inner.done():
849 inner.remove_done_callback(_inner_done_callback)
850
851 inner.add_done_callback(_inner_done_callback)
852 outer.add_done_callback(_outer_done_callback)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700853 return outer
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700854
855
856def run_coroutine_threadsafe(coro, loop):
857 """Submit a coroutine object to a given event loop.
858
859 Return a concurrent.futures.Future to access the result.
860 """
861 if not coroutines.iscoroutine(coro):
862 raise TypeError('A coroutine object is required')
863 future = concurrent.futures.Future()
864
865 def callback():
Guido van Rossum601953b2015-10-05 16:20:00 -0700866 try:
867 futures._chain_future(ensure_future(coro, loop=loop), future)
Yury Selivanov431b5402019-05-27 14:45:12 +0200868 except (SystemExit, KeyboardInterrupt):
869 raise
870 except BaseException as exc:
Guido van Rossum601953b2015-10-05 16:20:00 -0700871 if future.set_running_or_notify_cancel():
872 future.set_exception(exc)
873 raise
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700874
875 loop.call_soon_threadsafe(callback)
876 return future
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200877
878
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500879# WeakSet containing all alive tasks.
880_all_tasks = weakref.WeakSet()
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200881
882# Dictionary containing tasks that are currently active in
883# all running event loops. {EventLoop: Task}
884_current_tasks = {}
885
886
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500887def _register_task(task):
888 """Register a new task in asyncio as executed by loop."""
889 _all_tasks.add(task)
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200890
891
892def _enter_task(loop, task):
893 current_task = _current_tasks.get(loop)
894 if current_task is not None:
895 raise RuntimeError(f"Cannot enter into task {task!r} while another "
896 f"task {current_task!r} is being executed.")
897 _current_tasks[loop] = task
898
899
900def _leave_task(loop, task):
901 current_task = _current_tasks.get(loop)
902 if current_task is not task:
903 raise RuntimeError(f"Leaving task {task!r} does not match "
904 f"the current task {current_task!r}.")
905 del _current_tasks[loop]
906
907
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500908def _unregister_task(task):
909 """Unregister a task."""
910 _all_tasks.discard(task)
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200911
912
913_py_register_task = _register_task
914_py_unregister_task = _unregister_task
915_py_enter_task = _enter_task
916_py_leave_task = _leave_task
917
918
919try:
920 from _asyncio import (_register_task, _unregister_task,
921 _enter_task, _leave_task,
922 _all_tasks, _current_tasks)
923except ImportError:
924 pass
925else:
926 _c_register_task = _register_task
927 _c_unregister_task = _unregister_task
928 _c_enter_task = _enter_task
929 _c_leave_task = _leave_task