blob: 8b05434f273b52ef9a2ed1c1304c56676c2ba598 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Yury Selivanov6370f342017-12-10 18:36:12 -05003__all__ = (
Andrew Svetlovf74ef452017-12-15 07:04:38 +02004 'Task', 'create_task',
Yury Selivanov6370f342017-12-10 18:36:12 -05005 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
Yury Selivanov9edad3c2017-12-11 10:03:48 -05006 'wait', 'wait_for', 'as_completed', 'sleep',
Yury Selivanov6370f342017-12-10 18:36:12 -05007 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
Andrew Svetlov44d1a592017-12-16 21:58:38 +02008 'current_task', 'all_tasks',
9 '_register_task', '_unregister_task', '_enter_task', '_leave_task',
Yury Selivanov6370f342017-12-10 18:36:12 -050010)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012import concurrent.futures
Yury Selivanovf23746a2018-01-22 19:11:18 -050013import contextvars
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014import functools
15import inspect
Alex Grönholmcca4eec2018-08-09 00:06:47 +030016import itertools
Andrew Svetlov5f841b52017-12-09 00:23:48 +020017import types
Yury Selivanov59eb9a42015-05-11 14:48:38 -040018import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019import weakref
20
Yury Selivanova0c1ba62016-10-28 12:52:37 -040021from . import base_tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020022from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070024from . import exceptions
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025from . import futures
Andrew Svetlov68b34a72019-05-16 17:52:10 +030026from .coroutines import _is_coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027
Alex Grönholmcca4eec2018-08-09 00:06:47 +030028# Helper to generate new task names
29# This uses itertools.count() instead of a "+= 1" operation because the latter
30# is not thread safe. See bpo-11866 for a longer explanation.
31_task_name_counter = itertools.count(1).__next__
32
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070033
Andrew Svetlov44d1a592017-12-16 21:58:38 +020034def current_task(loop=None):
35 """Return a currently executed task."""
36 if loop is None:
37 loop = events.get_running_loop()
38 return _current_tasks.get(loop)
39
40
41def all_tasks(loop=None):
42 """Return a set of all tasks for the loop."""
43 if loop is None:
Yury Selivanov416c1eb2018-05-28 17:54:02 -040044 loop = events.get_running_loop()
Andrew Svetlov65aa64f2019-06-11 18:27:30 +030045 # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
46 # thread while we do so. Therefore we cast it to list prior to filtering. The list
47 # cast itself requires iteration, so we repeat it several times ignoring
48 # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
49 # details.
50 i = 0
51 while True:
52 try:
53 tasks = list(_all_tasks)
54 except RuntimeError:
55 i += 1
56 if i >= 1000:
57 raise
58 else:
59 break
60 return {t for t in tasks
Yury Selivanov416c1eb2018-05-28 17:54:02 -040061 if futures._get_loop(t) is loop and not t.done()}
62
63
64def _all_tasks_compat(loop=None):
65 # Different from "all_task()" by returning *all* Tasks, including
66 # the completed ones. Used to implement deprecated "Tasks.all_task()"
67 # method.
68 if loop is None:
Andrew Svetlov44d1a592017-12-16 21:58:38 +020069 loop = events.get_event_loop()
Andrew Svetlov65aa64f2019-06-11 18:27:30 +030070 # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
71 # thread while we do so. Therefore we cast it to list prior to filtering. The list
72 # cast itself requires iteration, so we repeat it several times ignoring
73 # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
74 # details.
75 i = 0
76 while True:
77 try:
78 tasks = list(_all_tasks)
79 except RuntimeError:
80 i += 1
81 if i >= 1000:
82 raise
83 else:
84 break
85 return {t for t in tasks if futures._get_loop(t) is loop}
Andrew Svetlov44d1a592017-12-16 21:58:38 +020086
87
Alex Grönholmcca4eec2018-08-09 00:06:47 +030088def _set_task_name(task, name):
89 if name is not None:
90 try:
91 set_name = task.set_name
92 except AttributeError:
93 pass
94 else:
95 set_name(name)
96
97
Yury Selivanov0cf16f92017-12-25 10:48:15 -050098class Task(futures._PyFuture): # Inherit Python Task implementation
99 # from a Python Future implementation.
100
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700101 """A coroutine wrapped in a Future."""
102
103 # An important invariant maintained while a Task not done:
104 #
105 # - Either _fut_waiter is None, and _step() is scheduled;
106 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
107 #
108 # The only transition from the latter to the former is through
109 # _wakeup(). When _fut_waiter is not None, one of its callbacks
110 # must be _wakeup().
111
Victor Stinnerfe22e092014-12-04 23:00:13 +0100112 # If False, don't log a message if the task is destroyed whereas its
113 # status is still pending
114 _log_destroy_pending = True
115
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300116 def __init__(self, coro, *, loop=None, name=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700117 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200118 if self._source_traceback:
119 del self._source_traceback[-1]
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200120 if not coroutines.iscoroutine(coro):
121 # raise after Future.__init__(), attrs are required for __del__
122 # prevent logging for pending task in __del__
123 self._log_destroy_pending = False
124 raise TypeError(f"a coroutine was expected, got {coro!r}")
125
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300126 if name is None:
127 self._name = f'Task-{_task_name_counter()}'
128 else:
129 self._name = str(name)
130
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700131 self._must_cancel = False
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200132 self._fut_waiter = None
133 self._coro = coro
Yury Selivanovf23746a2018-01-22 19:11:18 -0500134 self._context = contextvars.copy_context()
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200135
Yury Selivanov22feeb82018-01-24 11:31:01 -0500136 self._loop.call_soon(self.__step, context=self._context)
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500137 _register_task(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700138
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900139 def __del__(self):
140 if self._state == futures._PENDING and self._log_destroy_pending:
141 context = {
142 'task': self,
143 'message': 'Task was destroyed but it is pending!',
144 }
145 if self._source_traceback:
146 context['source_traceback'] = self._source_traceback
147 self._loop.call_exception_handler(context)
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500148 super().__del__()
Victor Stinnera02f81f2014-06-24 22:37:53 +0200149
Batuhan Taşkayadec36722019-12-07 14:05:07 +0300150 def __class_getitem__(cls, type):
151 return cls
152
Victor Stinner313a9802014-07-29 12:58:23 +0200153 def _repr_info(self):
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400154 return base_tasks._task_repr_info(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700155
Alex Grönholm98ef9202019-05-30 18:30:09 +0300156 def get_coro(self):
157 return self._coro
158
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300159 def get_name(self):
160 return self._name
161
162 def set_name(self, value):
163 self._name = str(value)
164
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500165 def set_result(self, result):
166 raise RuntimeError('Task does not support set_result operation')
167
168 def set_exception(self, exception):
169 raise RuntimeError('Task does not support set_exception operation')
170
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700171 def get_stack(self, *, limit=None):
172 """Return the list of stack frames for this task's coroutine.
173
Victor Stinnerd87de832014-12-02 17:57:04 +0100174 If the coroutine is not done, this returns the stack where it is
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700175 suspended. If the coroutine has completed successfully or was
176 cancelled, this returns an empty list. If the coroutine was
177 terminated by an exception, this returns the list of traceback
178 frames.
179
180 The frames are always ordered from oldest to newest.
181
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500182 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700183 return; by default all available frames are returned. Its
184 meaning differs depending on whether a stack or a traceback is
185 returned: the newest frames of a stack are returned, but the
186 oldest frames of a traceback are returned. (This matches the
187 behavior of the traceback module.)
188
189 For reasons beyond our control, only one stack frame is
190 returned for a suspended coroutine.
191 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400192 return base_tasks._task_get_stack(self, limit)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700193
194 def print_stack(self, *, limit=None, file=None):
195 """Print the stack or traceback for this task's coroutine.
196
197 This produces output similar to that of the traceback module,
198 for the frames retrieved by get_stack(). The limit argument
199 is passed to get_stack(). The file argument is an I/O stream
R David Murray8e069d52014-09-24 13:13:45 -0400200 to which the output is written; by default output is written
201 to sys.stderr.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700202 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400203 return base_tasks._task_print_stack(self, limit, file)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700204
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700205 def cancel(self, msg=None):
R David Murray8e069d52014-09-24 13:13:45 -0400206 """Request that this task cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200207
Victor Stinner8d213572014-06-02 23:06:46 +0200208 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200209 wrapped coroutine on the next cycle through the event loop.
210 The coroutine then has a chance to clean up or even deny
211 the request using try/except/finally.
212
R David Murray8e069d52014-09-24 13:13:45 -0400213 Unlike Future.cancel, this does not guarantee that the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200214 task will be cancelled: the exception might be caught and
R David Murray8e069d52014-09-24 13:13:45 -0400215 acted upon, delaying cancellation of the task or preventing
216 cancellation completely. The task may also return a value or
217 raise a different exception.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200218
219 Immediately after this method is called, Task.cancelled() will
220 not return True (unless the task was already cancelled). A
221 task will be marked as cancelled when the wrapped coroutine
222 terminates with a CancelledError exception (even if cancel()
223 was not called).
224 """
Yury Selivanov7ce1c6f2017-06-11 13:49:18 +0000225 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700226 if self.done():
227 return False
228 if self._fut_waiter is not None:
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700229 if self._fut_waiter.cancel(msg=msg):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700230 # Leave self._fut_waiter; it may be a Task that
231 # catches and ignores the cancellation so we may have
232 # to cancel it again later.
233 return True
Yury Selivanov22feeb82018-01-24 11:31:01 -0500234 # It must be the case that self.__step is already scheduled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700235 self._must_cancel = True
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700236 self._cancel_message = msg
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700237 return True
238
Yury Selivanov22feeb82018-01-24 11:31:01 -0500239 def __step(self, exc=None):
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500240 if self.done():
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700241 raise exceptions.InvalidStateError(
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500242 f'_step(): already done: {self!r}, {exc!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700243 if self._must_cancel:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700244 if not isinstance(exc, exceptions.CancelledError):
Chris Jerdonekda742ba2020-05-17 22:47:31 -0700245 exc = self._make_cancelled_error()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700246 self._must_cancel = False
247 coro = self._coro
248 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800249
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200250 _enter_task(self._loop, self)
Yury Selivanovd59bba82015-11-20 12:41:03 -0500251 # Call either coro.throw(exc) or coro.send(None).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700252 try:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500253 if exc is None:
254 # We use the `send` method directly, because coroutines
255 # don't have `__iter__` and `__next__` methods.
256 result = coro.send(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700257 else:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500258 result = coro.throw(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700259 except StopIteration as exc:
INADA Naoki991adca2017-05-11 21:18:38 +0900260 if self._must_cancel:
261 # Task is cancelled right before coro stops.
262 self._must_cancel = False
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700263 super().cancel(msg=self._cancel_message)
INADA Naoki991adca2017-05-11 21:18:38 +0900264 else:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500265 super().set_result(exc.value)
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700266 except exceptions.CancelledError as exc:
Chris Jerdonekda742ba2020-05-17 22:47:31 -0700267 # Save the original exception so we can chain it later.
268 self._cancelled_exc = exc
269 super().cancel() # I.e., Future.cancel(self).
Yury Selivanov431b5402019-05-27 14:45:12 +0200270 except (KeyboardInterrupt, SystemExit) as exc:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500271 super().set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700272 raise
Yury Selivanov431b5402019-05-27 14:45:12 +0200273 except BaseException as exc:
274 super().set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700275 else:
Guido van Rossum1140a032016-09-09 12:54:54 -0700276 blocking = getattr(result, '_asyncio_future_blocking', None)
277 if blocking is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700278 # Yielded Future must come from Future.__iter__().
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500279 if futures._get_loop(result) is not self._loop:
Yury Selivanov6370f342017-12-10 18:36:12 -0500280 new_exc = RuntimeError(
281 f'Task {self!r} got Future '
282 f'{result!r} attached to a different loop')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500283 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500284 self.__step, new_exc, context=self._context)
Guido van Rossum1140a032016-09-09 12:54:54 -0700285 elif blocking:
Yury Selivanov4145c832016-10-09 12:19:12 -0400286 if result is self:
Yury Selivanov6370f342017-12-10 18:36:12 -0500287 new_exc = RuntimeError(
288 f'Task cannot await on itself: {self!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500289 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500290 self.__step, new_exc, context=self._context)
Yury Selivanov4145c832016-10-09 12:19:12 -0400291 else:
292 result._asyncio_future_blocking = False
Yury Selivanovf23746a2018-01-22 19:11:18 -0500293 result.add_done_callback(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500294 self.__wakeup, context=self._context)
Yury Selivanov4145c832016-10-09 12:19:12 -0400295 self._fut_waiter = result
296 if self._must_cancel:
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700297 if self._fut_waiter.cancel(
298 msg=self._cancel_message):
Yury Selivanov4145c832016-10-09 12:19:12 -0400299 self._must_cancel = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700300 else:
Yury Selivanov6370f342017-12-10 18:36:12 -0500301 new_exc = RuntimeError(
302 f'yield was used instead of yield from '
303 f'in task {self!r} with {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500304 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500305 self.__step, new_exc, context=self._context)
Yury Selivanov6370f342017-12-10 18:36:12 -0500306
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700307 elif result is None:
308 # Bare yield relinquishes control for one event loop iteration.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500309 self._loop.call_soon(self.__step, context=self._context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700310 elif inspect.isgenerator(result):
311 # Yielding a generator is just wrong.
Yury Selivanov6370f342017-12-10 18:36:12 -0500312 new_exc = RuntimeError(
313 f'yield was used instead of yield from for '
Serhiy Storchaka66553542018-05-20 16:30:31 +0300314 f'generator in task {self!r} with {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500315 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500316 self.__step, new_exc, context=self._context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700317 else:
318 # Yielding something else is an error.
Yury Selivanov6370f342017-12-10 18:36:12 -0500319 new_exc = RuntimeError(f'Task got bad yield: {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500320 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500321 self.__step, new_exc, context=self._context)
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800322 finally:
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200323 _leave_task(self._loop, self)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100324 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700325
Yury Selivanov22feeb82018-01-24 11:31:01 -0500326 def __wakeup(self, future):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700327 try:
Yury Selivanova4afc482015-11-16 15:12:10 -0500328 future.result()
Yury Selivanov431b5402019-05-27 14:45:12 +0200329 except BaseException as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700330 # This may also be a cancellation.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500331 self.__step(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700332 else:
Yury Selivanova4afc482015-11-16 15:12:10 -0500333 # Don't pass the value of `future.result()` explicitly,
334 # as `Future.__iter__` and `Future.__await__` don't need it.
335 # If we call `_step(value, None)` instead of `_step()`,
336 # Python eval loop would use `.send(value)` method call,
337 # instead of `__next__()`, which is slower for futures
338 # that return non-generator iterators from their `__iter__`.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500339 self.__step()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700340 self = None # Needed to break cycles when an exception occurs.
341
342
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400343_PyTask = Task
344
345
346try:
347 import _asyncio
348except ImportError:
349 pass
350else:
351 # _CTask is needed for tests.
352 Task = _CTask = _asyncio.Task
353
354
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300355def create_task(coro, *, name=None):
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200356 """Schedule the execution of a coroutine object in a spawn task.
357
358 Return a Task object.
359 """
360 loop = events.get_running_loop()
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300361 task = loop.create_task(coro)
362 _set_task_name(task, name)
363 return task
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200364
365
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700366# wait() and as_completed() similar to those in PEP 3148.
367
368FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
369FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
370ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
371
372
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200373async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700374 """Wait for the Futures and coroutines given by fs to complete.
375
Victor Stinnerdb74d982014-06-10 11:16:05 +0200376 The sequence futures must not be empty.
377
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700378 Coroutines will be wrapped in Tasks.
379
380 Returns two sets of Future: (done, pending).
381
382 Usage:
383
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200384 done, pending = await asyncio.wait(fs)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700385
386 Note: This does not raise TimeoutError! Futures that aren't done
387 when the timeout occurs are returned in the second set.
388 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700389 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500390 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700391 if not fs:
392 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200393 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
Yury Selivanov6370f342017-12-10 18:36:12 -0500394 raise ValueError(f'Invalid return_when value: {return_when}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700395
396 if loop is None:
João Júnior558c49b2018-09-24 06:51:22 -0300397 loop = events.get_running_loop()
398 else:
Matthias Bussonnierd0ebf132019-05-20 23:20:10 -0700399 warnings.warn("The loop argument is deprecated since Python 3.8, "
400 "and scheduled for removal in Python 3.10.",
João Júnior558c49b2018-09-24 06:51:22 -0300401 DeprecationWarning, stacklevel=2)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700402
Kyle Stanley89aa7f02019-12-30 06:50:19 -0500403 if any(coroutines.iscoroutine(f) for f in set(fs)):
404 warnings.warn("The explicit passing of coroutine objects to "
405 "asyncio.wait() is deprecated since Python 3.8, and "
406 "scheduled for removal in Python 3.11.",
407 DeprecationWarning, stacklevel=2)
408
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400409 fs = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700410
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200411 return await _wait(fs, timeout, return_when, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700412
413
Victor Stinner59e08022014-08-28 11:19:25 +0200414def _release_waiter(waiter, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700415 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200416 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700417
418
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200419async def wait_for(fut, timeout, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700420 """Wait for the single Future or coroutine to complete, with timeout.
421
422 Coroutine will be wrapped in Task.
423
Victor Stinner421e49b2014-01-23 17:40:59 +0100424 Returns result of the Future or coroutine. When a timeout occurs,
425 it cancels the task and raises TimeoutError. To avoid the task
426 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700427
Victor Stinner922bc2c2015-01-15 16:29:10 +0100428 If the wait is cancelled, the task is also cancelled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700429
Victor Stinner922bc2c2015-01-15 16:29:10 +0100430 This function is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700431 """
432 if loop is None:
João Júnior558c49b2018-09-24 06:51:22 -0300433 loop = events.get_running_loop()
434 else:
Matthias Bussonnierd0ebf132019-05-20 23:20:10 -0700435 warnings.warn("The loop argument is deprecated since Python 3.8, "
436 "and scheduled for removal in Python 3.10.",
João Júnior558c49b2018-09-24 06:51:22 -0300437 DeprecationWarning, stacklevel=2)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700438
Guido van Rossum48c66c32014-01-29 14:30:38 -0800439 if timeout is None:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200440 return await fut
Guido van Rossum48c66c32014-01-29 14:30:38 -0800441
Victor K4d071892017-10-05 19:04:39 +0300442 if timeout <= 0:
443 fut = ensure_future(fut, loop=loop)
444
445 if fut.done():
446 return fut.result()
447
Miss Islington (bot)1036ccb2020-08-26 10:14:59 -0700448 await _cancel_and_wait(fut, loop=loop)
449 try:
450 fut.result()
451 except exceptions.CancelledError as exc:
452 raise exceptions.TimeoutError() from exc
453 else:
454 raise exceptions.TimeoutError()
Victor K4d071892017-10-05 19:04:39 +0300455
Yury Selivanov7661db62016-05-16 15:38:39 -0400456 waiter = loop.create_future()
Victor Stinner59e08022014-08-28 11:19:25 +0200457 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
458 cb = functools.partial(_release_waiter, waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700459
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400460 fut = ensure_future(fut, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700461 fut.add_done_callback(cb)
462
463 try:
Victor Stinner59e08022014-08-28 11:19:25 +0200464 # wait until the future completes or the timeout
Victor Stinner922bc2c2015-01-15 16:29:10 +0100465 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200466 await waiter
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700467 except exceptions.CancelledError:
Miss Islington (bot)9de6be42020-08-26 10:15:35 -0700468 if fut.done():
469 return fut.result()
470 else:
471 fut.remove_done_callback(cb)
472 fut.cancel()
473 raise
Victor Stinner59e08022014-08-28 11:19:25 +0200474
475 if fut.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700476 return fut.result()
477 else:
478 fut.remove_done_callback(cb)
Elvis Pranskevichuse2b340a2018-05-29 17:31:01 -0400479 # We must ensure that the task is not running
480 # after wait_for() returns.
481 # See https://bugs.python.org/issue32751
482 await _cancel_and_wait(fut, loop=loop)
romasku382a5632020-05-15 23:12:05 +0300483 # In case task cancellation failed with some
484 # exception, we should re-raise it
485 # See https://bugs.python.org/issue40607
486 try:
487 fut.result()
488 except exceptions.CancelledError as exc:
489 raise exceptions.TimeoutError() from exc
490 else:
491 raise exceptions.TimeoutError()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700492 finally:
493 timeout_handle.cancel()
494
495
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200496async def _wait(fs, timeout, return_when, loop):
Elvis Pranskevichuse2b340a2018-05-29 17:31:01 -0400497 """Internal helper for wait().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700498
499 The fs argument must be a collection of Futures.
500 """
501 assert fs, 'Set of Futures is empty.'
Yury Selivanov7661db62016-05-16 15:38:39 -0400502 waiter = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700503 timeout_handle = None
504 if timeout is not None:
505 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
506 counter = len(fs)
507
508 def _on_completion(f):
509 nonlocal counter
510 counter -= 1
511 if (counter <= 0 or
512 return_when == FIRST_COMPLETED or
513 return_when == FIRST_EXCEPTION and (not f.cancelled() and
514 f.exception() is not None)):
515 if timeout_handle is not None:
516 timeout_handle.cancel()
517 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200518 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700519
520 for f in fs:
521 f.add_done_callback(_on_completion)
522
523 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200524 await waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700525 finally:
526 if timeout_handle is not None:
527 timeout_handle.cancel()
gescheitc1964e92019-05-03 18:18:02 +0300528 for f in fs:
529 f.remove_done_callback(_on_completion)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700530
531 done, pending = set(), set()
532 for f in fs:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700533 if f.done():
534 done.add(f)
535 else:
536 pending.add(f)
537 return done, pending
538
539
Elvis Pranskevichuse2b340a2018-05-29 17:31:01 -0400540async def _cancel_and_wait(fut, loop):
541 """Cancel the *fut* future or task and wait until it completes."""
542
543 waiter = loop.create_future()
544 cb = functools.partial(_release_waiter, waiter)
545 fut.add_done_callback(cb)
546
547 try:
548 fut.cancel()
549 # We cannot wait on *fut* directly to make
550 # sure _cancel_and_wait itself is reliably cancellable.
551 await waiter
552 finally:
553 fut.remove_done_callback(cb)
554
555
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700556# This is *not* a @coroutine! It is just an iterator (yielding Futures).
557def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800558 """Return an iterator whose values are coroutines.
559
560 When waiting for the yielded coroutines you'll get the results (or
561 exceptions!) of the original Futures (or coroutines), in the order
562 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700563
564 This differs from PEP 3148; the proper way to use this is:
565
566 for f in as_completed(fs):
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200567 result = await f # The 'await' may raise.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700568 # Use result.
569
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200570 If a timeout is specified, the 'await' will raise
Guido van Rossumb58f0532014-02-12 17:58:19 -0800571 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700572
573 Note: The futures 'f' are not necessarily members of fs.
574 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700575 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500576 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Andrew Svetlova4888792019-09-12 15:40:40 +0300577
Guido van Rossumb58f0532014-02-12 17:58:19 -0800578 from .queues import Queue # Import here to avoid circular import problem.
579 done = Queue(loop=loop)
Andrew Svetlova4888792019-09-12 15:40:40 +0300580
581 if loop is None:
582 loop = events.get_event_loop()
583 else:
584 warnings.warn("The loop argument is deprecated since Python 3.8, "
585 "and scheduled for removal in Python 3.10.",
586 DeprecationWarning, stacklevel=2)
587 todo = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800588 timeout_handle = None
589
590 def _on_timeout():
591 for f in todo:
592 f.remove_done_callback(_on_completion)
593 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
594 todo.clear() # Can't do todo.remove(f) in the loop.
595
596 def _on_completion(f):
597 if not todo:
598 return # _on_timeout() was here first.
599 todo.remove(f)
600 done.put_nowait(f)
601 if not todo and timeout_handle is not None:
602 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700603
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200604 async def _wait_for_one():
605 f = await done.get()
Guido van Rossumb58f0532014-02-12 17:58:19 -0800606 if f is None:
607 # Dummy value from _on_timeout().
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700608 raise exceptions.TimeoutError
Guido van Rossumb58f0532014-02-12 17:58:19 -0800609 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700610
Guido van Rossumb58f0532014-02-12 17:58:19 -0800611 for f in todo:
612 f.add_done_callback(_on_completion)
613 if todo and timeout is not None:
614 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700615 for _ in range(len(todo)):
616 yield _wait_for_one()
617
618
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200619@types.coroutine
620def __sleep0():
621 """Skip one event loop run cycle.
622
623 This is a private helper for 'asyncio.sleep()', used
624 when the 'delay' is set to 0. It uses a bare 'yield'
Yury Selivanov22feeb82018-01-24 11:31:01 -0500625 expression (which Task.__step knows how to handle)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200626 instead of creating a Future object.
627 """
628 yield
629
630
631async def sleep(delay, result=None, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700632 """Coroutine that completes after a given time (in seconds)."""
Andrew Svetlov5382c052017-12-17 16:41:30 +0200633 if delay <= 0:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200634 await __sleep0()
Yury Selivanovade04122015-11-05 14:29:04 -0500635 return result
636
Yury Selivanov7661db62016-05-16 15:38:39 -0400637 if loop is None:
João Júnior558c49b2018-09-24 06:51:22 -0300638 loop = events.get_running_loop()
639 else:
Matthias Bussonnierd0ebf132019-05-20 23:20:10 -0700640 warnings.warn("The loop argument is deprecated since Python 3.8, "
641 "and scheduled for removal in Python 3.10.",
João Júnior558c49b2018-09-24 06:51:22 -0300642 DeprecationWarning, stacklevel=2)
643
Yury Selivanov7661db62016-05-16 15:38:39 -0400644 future = loop.create_future()
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500645 h = loop.call_later(delay,
646 futures._set_result_unless_cancelled,
647 future, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700648 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200649 return await future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700650 finally:
651 h.cancel()
652
653
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400654def ensure_future(coro_or_future, *, loop=None):
Yury Selivanov620279b2015-10-02 15:00:19 -0400655 """Wrap a coroutine or an awaitable in a future.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400656
657 If the argument is a Future, it is returned directly.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700658 """
jimmylaie549c4b2018-05-28 06:42:05 -1000659 if coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200660 if loop is None:
661 loop = events.get_event_loop()
662 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200663 if task._source_traceback:
664 del task._source_traceback[-1]
665 return task
jimmylaie549c4b2018-05-28 06:42:05 -1000666 elif futures.isfuture(coro_or_future):
667 if loop is not None and loop is not futures._get_loop(coro_or_future):
Zackery Spytz4737b922019-05-03 09:35:26 -0600668 raise ValueError('The future belongs to a different loop than '
669 'the one specified as the loop argument')
jimmylaie549c4b2018-05-28 06:42:05 -1000670 return coro_or_future
Victor Stinner3f438a92017-11-28 14:43:52 +0100671 elif inspect.isawaitable(coro_or_future):
Yury Selivanov620279b2015-10-02 15:00:19 -0400672 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700673 else:
Charles Renwickae5b3262017-04-21 16:49:48 -0400674 raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
675 'required')
Yury Selivanov620279b2015-10-02 15:00:19 -0400676
677
Andrew Svetlov68b34a72019-05-16 17:52:10 +0300678@types.coroutine
Yury Selivanov620279b2015-10-02 15:00:19 -0400679def _wrap_awaitable(awaitable):
680 """Helper for asyncio.ensure_future().
681
682 Wraps awaitable (an object with __await__) into a coroutine
683 that will later be wrapped in a Task by ensure_future().
684 """
685 return (yield from awaitable.__await__())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700686
Andrew Svetlov68b34a72019-05-16 17:52:10 +0300687_wrap_awaitable._is_coroutine = _is_coroutine
688
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700689
690class _GatheringFuture(futures.Future):
691 """Helper for gather().
692
693 This overrides cancel() to cancel all the children and act more
694 like Task.cancel(), which doesn't immediately mark itself as
695 cancelled.
696 """
697
698 def __init__(self, children, *, loop=None):
699 super().__init__(loop=loop)
700 self._children = children
Yury Selivanov863b6742018-05-29 17:20:02 -0400701 self._cancel_requested = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700702
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700703 def cancel(self, msg=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700704 if self.done():
705 return False
Yury Selivanov3d676152016-10-21 17:22:17 -0400706 ret = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700707 for child in self._children:
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700708 if child.cancel(msg=msg):
Yury Selivanov3d676152016-10-21 17:22:17 -0400709 ret = True
Yury Selivanov863b6742018-05-29 17:20:02 -0400710 if ret:
711 # If any child tasks were actually cancelled, we should
712 # propagate the cancellation request regardless of
713 # *return_exceptions* argument. See issue 32684.
714 self._cancel_requested = True
Yury Selivanov3d676152016-10-21 17:22:17 -0400715 return ret
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700716
717
718def gather(*coros_or_futures, loop=None, return_exceptions=False):
Yury Selivanov36c2c042017-12-19 07:19:53 -0500719 """Return a future aggregating results from the given coroutines/futures.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700720
Guido van Rossume3c65a72016-09-30 08:17:15 -0700721 Coroutines will be wrapped in a future and scheduled in the event
722 loop. They will not necessarily be scheduled in the same order as
723 passed in.
724
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700725 All futures must share the same event loop. If all the tasks are
726 done successfully, the returned future's result is the list of
727 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500728 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700729 exceptions in the tasks are treated the same as successful
730 results, and gathered in the result list; otherwise, the first
731 raised exception will be immediately propagated to the returned
732 future.
733
734 Cancellation: if the outer Future is cancelled, all children (that
735 have not completed yet) are also cancelled. If any child is
736 cancelled, this is treated as if it raised CancelledError --
737 the outer Future is *not* cancelled in this case. (This is to
738 prevent the cancellation of one child to cause other children to
739 be cancelled.)
Miss Islington (bot)46634b72020-07-20 02:01:39 -0700740
741 If *return_exceptions* is False, cancelling gather() after it
742 has been marked done won't cancel any submitted awaitables.
743 For instance, gather can be marked done after propagating an
744 exception to the caller, therefore, calling ``gather.cancel()``
745 after catching an exception (raised by one of the awaitables) from
746 gather won't cancel any other awaitables.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700747 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200748 if not coros_or_futures:
Yury Selivanov7661db62016-05-16 15:38:39 -0400749 if loop is None:
750 loop = events.get_event_loop()
Andrew Svetlova4888792019-09-12 15:40:40 +0300751 else:
752 warnings.warn("The loop argument is deprecated since Python 3.8, "
753 "and scheduled for removal in Python 3.10.",
754 DeprecationWarning, stacklevel=2)
Yury Selivanov7661db62016-05-16 15:38:39 -0400755 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700756 outer.set_result([])
757 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200758
Yury Selivanov36c2c042017-12-19 07:19:53 -0500759 def _done_callback(fut):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700760 nonlocal nfinished
Yury Selivanov36c2c042017-12-19 07:19:53 -0500761 nfinished += 1
762
Victor Stinner3531d902015-01-09 01:42:52 +0100763 if outer.done():
764 if not fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700765 # Mark exception retrieved.
766 fut.exception()
767 return
Victor Stinner3531d902015-01-09 01:42:52 +0100768
Yury Selivanov36c2c042017-12-19 07:19:53 -0500769 if not return_exceptions:
770 if fut.cancelled():
771 # Check if 'fut' is cancelled first, as
772 # 'fut.exception()' will *raise* a CancelledError
773 # instead of returning it.
Chris Jerdonekda742ba2020-05-17 22:47:31 -0700774 exc = fut._make_cancelled_error()
Yury Selivanov36c2c042017-12-19 07:19:53 -0500775 outer.set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700776 return
Yury Selivanov36c2c042017-12-19 07:19:53 -0500777 else:
778 exc = fut.exception()
779 if exc is not None:
780 outer.set_exception(exc)
781 return
782
783 if nfinished == nfuts:
784 # All futures are done; create a list of results
785 # and set it to the 'outer' future.
786 results = []
787
788 for fut in children:
789 if fut.cancelled():
Chris Jerdonekda742ba2020-05-17 22:47:31 -0700790 # Check if 'fut' is cancelled first, as 'fut.exception()'
791 # will *raise* a CancelledError instead of returning it.
792 # Also, since we're adding the exception return value
793 # to 'results' instead of raising it, don't bother
794 # setting __context__. This also lets us preserve
795 # calling '_make_cancelled_error()' at most once.
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700796 res = exceptions.CancelledError(
797 '' if fut._cancel_message is None else
798 fut._cancel_message)
Yury Selivanov36c2c042017-12-19 07:19:53 -0500799 else:
800 res = fut.exception()
801 if res is None:
802 res = fut.result()
803 results.append(res)
804
Yury Selivanov863b6742018-05-29 17:20:02 -0400805 if outer._cancel_requested:
806 # If gather is being cancelled we must propagate the
807 # cancellation regardless of *return_exceptions* argument.
808 # See issue 32684.
Chris Jerdonekda742ba2020-05-17 22:47:31 -0700809 exc = fut._make_cancelled_error()
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700810 outer.set_exception(exc)
Yury Selivanov863b6742018-05-29 17:20:02 -0400811 else:
812 outer.set_result(results)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700813
Yury Selivanov36c2c042017-12-19 07:19:53 -0500814 arg_to_fut = {}
815 children = []
816 nfuts = 0
817 nfinished = 0
818 for arg in coros_or_futures:
819 if arg not in arg_to_fut:
820 fut = ensure_future(arg, loop=loop)
821 if loop is None:
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500822 loop = futures._get_loop(fut)
Yury Selivanov36c2c042017-12-19 07:19:53 -0500823 if fut is not arg:
824 # 'arg' was not a Future, therefore, 'fut' is a new
825 # Future created specifically for 'arg'. Since the caller
826 # can't control it, disable the "destroy pending task"
827 # warning.
828 fut._log_destroy_pending = False
829
830 nfuts += 1
831 arg_to_fut[arg] = fut
832 fut.add_done_callback(_done_callback)
833
834 else:
835 # There's a duplicate Future object in coros_or_futures.
836 fut = arg_to_fut[arg]
837
838 children.append(fut)
839
840 outer = _GatheringFuture(children, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700841 return outer
842
843
844def shield(arg, *, loop=None):
845 """Wait for a future, shielding it from cancellation.
846
847 The statement
848
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200849 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700850
851 is exactly equivalent to the statement
852
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200853 res = await something()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700854
855 *except* that if the coroutine containing it is cancelled, the
856 task running in something() is not cancelled. From the POV of
857 something(), the cancellation did not happen. But its caller is
858 still cancelled, so the yield-from expression still raises
859 CancelledError. Note: If something() is cancelled by other means
860 this will still cancel shield().
861
862 If you want to completely ignore cancellation (not recommended)
863 you can combine shield() with a try/except clause, as follows:
864
865 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200866 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700867 except CancelledError:
868 res = None
869 """
Andrew Svetlova4888792019-09-12 15:40:40 +0300870 if loop is not None:
871 warnings.warn("The loop argument is deprecated since Python 3.8, "
872 "and scheduled for removal in Python 3.10.",
873 DeprecationWarning, stacklevel=2)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400874 inner = ensure_future(arg, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700875 if inner.done():
876 # Shortcut.
877 return inner
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500878 loop = futures._get_loop(inner)
Yury Selivanov7661db62016-05-16 15:38:39 -0400879 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700880
Romain Picardb35acc52019-05-07 20:58:24 +0200881 def _inner_done_callback(inner):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700882 if outer.cancelled():
Victor Stinner3531d902015-01-09 01:42:52 +0100883 if not inner.cancelled():
884 # Mark inner's result as retrieved.
885 inner.exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700886 return
Victor Stinner3531d902015-01-09 01:42:52 +0100887
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700888 if inner.cancelled():
889 outer.cancel()
890 else:
891 exc = inner.exception()
892 if exc is not None:
893 outer.set_exception(exc)
894 else:
895 outer.set_result(inner.result())
896
Romain Picardb35acc52019-05-07 20:58:24 +0200897
898 def _outer_done_callback(outer):
899 if not inner.done():
900 inner.remove_done_callback(_inner_done_callback)
901
902 inner.add_done_callback(_inner_done_callback)
903 outer.add_done_callback(_outer_done_callback)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700904 return outer
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700905
906
907def run_coroutine_threadsafe(coro, loop):
908 """Submit a coroutine object to a given event loop.
909
910 Return a concurrent.futures.Future to access the result.
911 """
912 if not coroutines.iscoroutine(coro):
913 raise TypeError('A coroutine object is required')
914 future = concurrent.futures.Future()
915
916 def callback():
Guido van Rossum601953b2015-10-05 16:20:00 -0700917 try:
918 futures._chain_future(ensure_future(coro, loop=loop), future)
Yury Selivanov431b5402019-05-27 14:45:12 +0200919 except (SystemExit, KeyboardInterrupt):
920 raise
921 except BaseException as exc:
Guido van Rossum601953b2015-10-05 16:20:00 -0700922 if future.set_running_or_notify_cancel():
923 future.set_exception(exc)
924 raise
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700925
926 loop.call_soon_threadsafe(callback)
927 return future
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200928
929
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500930# WeakSet containing all alive tasks.
931_all_tasks = weakref.WeakSet()
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200932
933# Dictionary containing tasks that are currently active in
934# all running event loops. {EventLoop: Task}
935_current_tasks = {}
936
937
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500938def _register_task(task):
939 """Register a new task in asyncio as executed by loop."""
940 _all_tasks.add(task)
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200941
942
943def _enter_task(loop, task):
944 current_task = _current_tasks.get(loop)
945 if current_task is not None:
946 raise RuntimeError(f"Cannot enter into task {task!r} while another "
947 f"task {current_task!r} is being executed.")
948 _current_tasks[loop] = task
949
950
951def _leave_task(loop, task):
952 current_task = _current_tasks.get(loop)
953 if current_task is not task:
954 raise RuntimeError(f"Leaving task {task!r} does not match "
955 f"the current task {current_task!r}.")
956 del _current_tasks[loop]
957
958
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500959def _unregister_task(task):
960 """Unregister a task."""
961 _all_tasks.discard(task)
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200962
963
964_py_register_task = _register_task
965_py_unregister_task = _unregister_task
966_py_enter_task = _enter_task
967_py_leave_task = _leave_task
968
969
970try:
971 from _asyncio import (_register_task, _unregister_task,
972 _enter_task, _leave_task,
973 _all_tasks, _current_tasks)
974except ImportError:
975 pass
976else:
977 _c_register_task = _register_task
978 _c_unregister_task = _unregister_task
979 _c_enter_task = _enter_task
980 _c_leave_task = _leave_task