blob: 402c6e26a79b08ce10ac6091cb0906a15fcffc2c [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Yury Selivanov6370f342017-12-10 18:36:12 -05003__all__ = (
Andrew Svetlovf74ef452017-12-15 07:04:38 +02004 'Task', 'create_task',
Yury Selivanov6370f342017-12-10 18:36:12 -05005 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
Yury Selivanov9edad3c2017-12-11 10:03:48 -05006 'wait', 'wait_for', 'as_completed', 'sleep',
Yury Selivanov6370f342017-12-10 18:36:12 -05007 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
Andrew Svetlov44d1a592017-12-16 21:58:38 +02008 'current_task', 'all_tasks',
9 '_register_task', '_unregister_task', '_enter_task', '_leave_task',
Yury Selivanov6370f342017-12-10 18:36:12 -050010)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012import concurrent.futures
Yury Selivanovf23746a2018-01-22 19:11:18 -050013import contextvars
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014import functools
15import inspect
Andrew Svetlov5f841b52017-12-09 00:23:48 +020016import types
Yury Selivanov59eb9a42015-05-11 14:48:38 -040017import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018import weakref
19
Yury Selivanova0c1ba62016-10-28 12:52:37 -040020from . import base_tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020021from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022from . import events
23from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020024from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026
Andrew Svetlov44d1a592017-12-16 21:58:38 +020027def current_task(loop=None):
28 """Return a currently executed task."""
29 if loop is None:
30 loop = events.get_running_loop()
31 return _current_tasks.get(loop)
32
33
34def all_tasks(loop=None):
35 """Return a set of all tasks for the loop."""
36 if loop is None:
Miss Islington (bot)ddc613f2018-05-28 17:16:43 -070037 loop = events.get_running_loop()
Miss Islington (bot)5dbb1b72018-10-13 12:26:47 -070038 # NB: set(_all_tasks) is required to protect
39 # from https://bugs.python.org/issue34970 bug
40 return {t for t in list(_all_tasks)
Miss Islington (bot)ddc613f2018-05-28 17:16:43 -070041 if futures._get_loop(t) is loop and not t.done()}
42
43
44def _all_tasks_compat(loop=None):
45 # Different from "all_task()" by returning *all* Tasks, including
46 # the completed ones. Used to implement deprecated "Tasks.all_task()"
47 # method.
48 if loop is None:
Andrew Svetlov44d1a592017-12-16 21:58:38 +020049 loop = events.get_event_loop()
Miss Islington (bot)5dbb1b72018-10-13 12:26:47 -070050 # NB: set(_all_tasks) is required to protect
51 # from https://bugs.python.org/issue34970 bug
52 return {t for t in list(_all_tasks) if futures._get_loop(t) is loop}
Andrew Svetlov44d1a592017-12-16 21:58:38 +020053
54
Yury Selivanov0cf16f92017-12-25 10:48:15 -050055class Task(futures._PyFuture): # Inherit Python Task implementation
56 # from a Python Future implementation.
57
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070058 """A coroutine wrapped in a Future."""
59
60 # An important invariant maintained while a Task not done:
61 #
62 # - Either _fut_waiter is None, and _step() is scheduled;
63 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
64 #
65 # The only transition from the latter to the former is through
66 # _wakeup(). When _fut_waiter is not None, one of its callbacks
67 # must be _wakeup().
68
Victor Stinnerfe22e092014-12-04 23:00:13 +010069 # If False, don't log a message if the task is destroyed whereas its
70 # status is still pending
71 _log_destroy_pending = True
72
Guido van Rossum1a605ed2013-12-06 12:57:40 -080073 @classmethod
74 def current_task(cls, loop=None):
75 """Return the currently running task in an event loop or None.
76
77 By default the current task for the current event loop is returned.
78
79 None is returned when called not in the context of a Task.
80 """
Andrew Svetlov44d1a592017-12-16 21:58:38 +020081 warnings.warn("Task.current_task() is deprecated, "
82 "use asyncio.current_task() instead",
83 PendingDeprecationWarning,
84 stacklevel=2)
Guido van Rossum1a605ed2013-12-06 12:57:40 -080085 if loop is None:
86 loop = events.get_event_loop()
Andrew Svetlov44d1a592017-12-16 21:58:38 +020087 return current_task(loop)
Guido van Rossum1a605ed2013-12-06 12:57:40 -080088
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070089 @classmethod
90 def all_tasks(cls, loop=None):
91 """Return a set of all tasks for an event loop.
92
93 By default all tasks for the current event loop are returned.
94 """
Andrew Svetlov44d1a592017-12-16 21:58:38 +020095 warnings.warn("Task.all_tasks() is deprecated, "
96 "use asyncio.all_tasks() instead",
97 PendingDeprecationWarning,
98 stacklevel=2)
Miss Islington (bot)ddc613f2018-05-28 17:16:43 -070099 return _all_tasks_compat(loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700100
101 def __init__(self, coro, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700102 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200103 if self._source_traceback:
104 del self._source_traceback[-1]
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200105 if not coroutines.iscoroutine(coro):
106 # raise after Future.__init__(), attrs are required for __del__
107 # prevent logging for pending task in __del__
108 self._log_destroy_pending = False
109 raise TypeError(f"a coroutine was expected, got {coro!r}")
110
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700111 self._must_cancel = False
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200112 self._fut_waiter = None
113 self._coro = coro
Yury Selivanovf23746a2018-01-22 19:11:18 -0500114 self._context = contextvars.copy_context()
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200115
Yury Selivanov22feeb82018-01-24 11:31:01 -0500116 self._loop.call_soon(self.__step, context=self._context)
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500117 _register_task(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700118
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900119 def __del__(self):
120 if self._state == futures._PENDING and self._log_destroy_pending:
121 context = {
122 'task': self,
123 'message': 'Task was destroyed but it is pending!',
124 }
125 if self._source_traceback:
126 context['source_traceback'] = self._source_traceback
127 self._loop.call_exception_handler(context)
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500128 super().__del__()
Victor Stinnera02f81f2014-06-24 22:37:53 +0200129
Victor Stinner313a9802014-07-29 12:58:23 +0200130 def _repr_info(self):
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400131 return base_tasks._task_repr_info(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700132
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500133 def set_result(self, result):
134 raise RuntimeError('Task does not support set_result operation')
135
136 def set_exception(self, exception):
137 raise RuntimeError('Task does not support set_exception operation')
138
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700139 def get_stack(self, *, limit=None):
140 """Return the list of stack frames for this task's coroutine.
141
Victor Stinnerd87de832014-12-02 17:57:04 +0100142 If the coroutine is not done, this returns the stack where it is
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700143 suspended. If the coroutine has completed successfully or was
144 cancelled, this returns an empty list. If the coroutine was
145 terminated by an exception, this returns the list of traceback
146 frames.
147
148 The frames are always ordered from oldest to newest.
149
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500150 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700151 return; by default all available frames are returned. Its
152 meaning differs depending on whether a stack or a traceback is
153 returned: the newest frames of a stack are returned, but the
154 oldest frames of a traceback are returned. (This matches the
155 behavior of the traceback module.)
156
157 For reasons beyond our control, only one stack frame is
158 returned for a suspended coroutine.
159 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400160 return base_tasks._task_get_stack(self, limit)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700161
162 def print_stack(self, *, limit=None, file=None):
163 """Print the stack or traceback for this task's coroutine.
164
165 This produces output similar to that of the traceback module,
166 for the frames retrieved by get_stack(). The limit argument
167 is passed to get_stack(). The file argument is an I/O stream
R David Murray8e069d52014-09-24 13:13:45 -0400168 to which the output is written; by default output is written
169 to sys.stderr.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700170 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400171 return base_tasks._task_print_stack(self, limit, file)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700172
173 def cancel(self):
R David Murray8e069d52014-09-24 13:13:45 -0400174 """Request that this task cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200175
Victor Stinner8d213572014-06-02 23:06:46 +0200176 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200177 wrapped coroutine on the next cycle through the event loop.
178 The coroutine then has a chance to clean up or even deny
179 the request using try/except/finally.
180
R David Murray8e069d52014-09-24 13:13:45 -0400181 Unlike Future.cancel, this does not guarantee that the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200182 task will be cancelled: the exception might be caught and
R David Murray8e069d52014-09-24 13:13:45 -0400183 acted upon, delaying cancellation of the task or preventing
184 cancellation completely. The task may also return a value or
185 raise a different exception.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200186
187 Immediately after this method is called, Task.cancelled() will
188 not return True (unless the task was already cancelled). A
189 task will be marked as cancelled when the wrapped coroutine
190 terminates with a CancelledError exception (even if cancel()
191 was not called).
192 """
Yury Selivanov7ce1c6f2017-06-11 13:49:18 +0000193 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700194 if self.done():
195 return False
196 if self._fut_waiter is not None:
197 if self._fut_waiter.cancel():
198 # Leave self._fut_waiter; it may be a Task that
199 # catches and ignores the cancellation so we may have
200 # to cancel it again later.
201 return True
Yury Selivanov22feeb82018-01-24 11:31:01 -0500202 # It must be the case that self.__step is already scheduled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700203 self._must_cancel = True
204 return True
205
Yury Selivanov22feeb82018-01-24 11:31:01 -0500206 def __step(self, exc=None):
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500207 if self.done():
208 raise futures.InvalidStateError(
209 f'_step(): already done: {self!r}, {exc!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700210 if self._must_cancel:
211 if not isinstance(exc, futures.CancelledError):
212 exc = futures.CancelledError()
213 self._must_cancel = False
214 coro = self._coro
215 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800216
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200217 _enter_task(self._loop, self)
Yury Selivanovd59bba82015-11-20 12:41:03 -0500218 # Call either coro.throw(exc) or coro.send(None).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700219 try:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500220 if exc is None:
221 # We use the `send` method directly, because coroutines
222 # don't have `__iter__` and `__next__` methods.
223 result = coro.send(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700224 else:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500225 result = coro.throw(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700226 except StopIteration as exc:
INADA Naoki991adca2017-05-11 21:18:38 +0900227 if self._must_cancel:
228 # Task is cancelled right before coro stops.
229 self._must_cancel = False
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500230 super().set_exception(futures.CancelledError())
INADA Naoki991adca2017-05-11 21:18:38 +0900231 else:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500232 super().set_result(exc.value)
Yury Selivanov4145c832016-10-09 12:19:12 -0400233 except futures.CancelledError:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700234 super().cancel() # I.e., Future.cancel(self).
235 except Exception as exc:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500236 super().set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700237 except BaseException as exc:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500238 super().set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700239 raise
240 else:
Guido van Rossum1140a032016-09-09 12:54:54 -0700241 blocking = getattr(result, '_asyncio_future_blocking', None)
242 if blocking is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700243 # Yielded Future must come from Future.__iter__().
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500244 if futures._get_loop(result) is not self._loop:
Yury Selivanov6370f342017-12-10 18:36:12 -0500245 new_exc = RuntimeError(
246 f'Task {self!r} got Future '
247 f'{result!r} attached to a different loop')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500248 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500249 self.__step, new_exc, context=self._context)
Guido van Rossum1140a032016-09-09 12:54:54 -0700250 elif blocking:
Yury Selivanov4145c832016-10-09 12:19:12 -0400251 if result is self:
Yury Selivanov6370f342017-12-10 18:36:12 -0500252 new_exc = RuntimeError(
253 f'Task cannot await on itself: {self!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500254 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500255 self.__step, new_exc, context=self._context)
Yury Selivanov4145c832016-10-09 12:19:12 -0400256 else:
257 result._asyncio_future_blocking = False
Yury Selivanovf23746a2018-01-22 19:11:18 -0500258 result.add_done_callback(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500259 self.__wakeup, context=self._context)
Yury Selivanov4145c832016-10-09 12:19:12 -0400260 self._fut_waiter = result
261 if self._must_cancel:
262 if self._fut_waiter.cancel():
263 self._must_cancel = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700264 else:
Yury Selivanov6370f342017-12-10 18:36:12 -0500265 new_exc = RuntimeError(
266 f'yield was used instead of yield from '
267 f'in task {self!r} with {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500268 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500269 self.__step, new_exc, context=self._context)
Yury Selivanov6370f342017-12-10 18:36:12 -0500270
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700271 elif result is None:
272 # Bare yield relinquishes control for one event loop iteration.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500273 self._loop.call_soon(self.__step, context=self._context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700274 elif inspect.isgenerator(result):
275 # Yielding a generator is just wrong.
Yury Selivanov6370f342017-12-10 18:36:12 -0500276 new_exc = RuntimeError(
277 f'yield was used instead of yield from for '
Miss Islington (bot)52d17412018-05-20 07:34:28 -0700278 f'generator in task {self!r} with {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500279 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500280 self.__step, new_exc, context=self._context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700281 else:
282 # Yielding something else is an error.
Yury Selivanov6370f342017-12-10 18:36:12 -0500283 new_exc = RuntimeError(f'Task got bad yield: {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500284 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500285 self.__step, new_exc, context=self._context)
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800286 finally:
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200287 _leave_task(self._loop, self)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100288 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700289
Yury Selivanov22feeb82018-01-24 11:31:01 -0500290 def __wakeup(self, future):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700291 try:
Yury Selivanova4afc482015-11-16 15:12:10 -0500292 future.result()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700293 except Exception as exc:
294 # This may also be a cancellation.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500295 self.__step(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700296 else:
Yury Selivanova4afc482015-11-16 15:12:10 -0500297 # Don't pass the value of `future.result()` explicitly,
298 # as `Future.__iter__` and `Future.__await__` don't need it.
299 # If we call `_step(value, None)` instead of `_step()`,
300 # Python eval loop would use `.send(value)` method call,
301 # instead of `__next__()`, which is slower for futures
302 # that return non-generator iterators from their `__iter__`.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500303 self.__step()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700304 self = None # Needed to break cycles when an exception occurs.
305
306
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400307_PyTask = Task
308
309
310try:
311 import _asyncio
312except ImportError:
313 pass
314else:
315 # _CTask is needed for tests.
316 Task = _CTask = _asyncio.Task
317
318
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200319def create_task(coro):
320 """Schedule the execution of a coroutine object in a spawn task.
321
322 Return a Task object.
323 """
324 loop = events.get_running_loop()
325 return loop.create_task(coro)
326
327
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700328# wait() and as_completed() similar to those in PEP 3148.
329
330FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
331FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
332ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
333
334
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200335async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700336 """Wait for the Futures and coroutines given by fs to complete.
337
Victor Stinnerdb74d982014-06-10 11:16:05 +0200338 The sequence futures must not be empty.
339
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700340 Coroutines will be wrapped in Tasks.
341
342 Returns two sets of Future: (done, pending).
343
344 Usage:
345
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200346 done, pending = await asyncio.wait(fs)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700347
348 Note: This does not raise TimeoutError! Futures that aren't done
349 when the timeout occurs are returned in the second set.
350 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700351 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500352 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700353 if not fs:
354 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200355 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
Yury Selivanov6370f342017-12-10 18:36:12 -0500356 raise ValueError(f'Invalid return_when value: {return_when}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700357
358 if loop is None:
359 loop = events.get_event_loop()
360
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400361 fs = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700362
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200363 return await _wait(fs, timeout, return_when, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700364
365
Victor Stinner59e08022014-08-28 11:19:25 +0200366def _release_waiter(waiter, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700367 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200368 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700369
370
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200371async def wait_for(fut, timeout, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700372 """Wait for the single Future or coroutine to complete, with timeout.
373
374 Coroutine will be wrapped in Task.
375
Victor Stinner421e49b2014-01-23 17:40:59 +0100376 Returns result of the Future or coroutine. When a timeout occurs,
377 it cancels the task and raises TimeoutError. To avoid the task
378 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700379
Victor Stinner922bc2c2015-01-15 16:29:10 +0100380 If the wait is cancelled, the task is also cancelled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700381
Victor Stinner922bc2c2015-01-15 16:29:10 +0100382 This function is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700383 """
384 if loop is None:
385 loop = events.get_event_loop()
386
Guido van Rossum48c66c32014-01-29 14:30:38 -0800387 if timeout is None:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200388 return await fut
Guido van Rossum48c66c32014-01-29 14:30:38 -0800389
Victor K4d071892017-10-05 19:04:39 +0300390 if timeout <= 0:
391 fut = ensure_future(fut, loop=loop)
392
393 if fut.done():
394 return fut.result()
395
396 fut.cancel()
397 raise futures.TimeoutError()
398
Yury Selivanov7661db62016-05-16 15:38:39 -0400399 waiter = loop.create_future()
Victor Stinner59e08022014-08-28 11:19:25 +0200400 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
401 cb = functools.partial(_release_waiter, waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700402
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400403 fut = ensure_future(fut, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700404 fut.add_done_callback(cb)
405
406 try:
Victor Stinner59e08022014-08-28 11:19:25 +0200407 # wait until the future completes or the timeout
Victor Stinner922bc2c2015-01-15 16:29:10 +0100408 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200409 await waiter
Victor Stinner922bc2c2015-01-15 16:29:10 +0100410 except futures.CancelledError:
411 fut.remove_done_callback(cb)
412 fut.cancel()
413 raise
Victor Stinner59e08022014-08-28 11:19:25 +0200414
415 if fut.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700416 return fut.result()
417 else:
418 fut.remove_done_callback(cb)
Miss Islington (bot)d8948c52018-05-29 15:37:06 -0700419 # We must ensure that the task is not running
420 # after wait_for() returns.
421 # See https://bugs.python.org/issue32751
422 await _cancel_and_wait(fut, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700423 raise futures.TimeoutError()
424 finally:
425 timeout_handle.cancel()
426
427
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200428async def _wait(fs, timeout, return_when, loop):
Miss Islington (bot)d8948c52018-05-29 15:37:06 -0700429 """Internal helper for wait().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700430
431 The fs argument must be a collection of Futures.
432 """
433 assert fs, 'Set of Futures is empty.'
Yury Selivanov7661db62016-05-16 15:38:39 -0400434 waiter = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700435 timeout_handle = None
436 if timeout is not None:
437 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
438 counter = len(fs)
439
440 def _on_completion(f):
441 nonlocal counter
442 counter -= 1
443 if (counter <= 0 or
444 return_when == FIRST_COMPLETED or
445 return_when == FIRST_EXCEPTION and (not f.cancelled() and
446 f.exception() is not None)):
447 if timeout_handle is not None:
448 timeout_handle.cancel()
449 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200450 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700451
452 for f in fs:
453 f.add_done_callback(_on_completion)
454
455 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200456 await waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700457 finally:
458 if timeout_handle is not None:
459 timeout_handle.cancel()
Miss Islington (bot)769ac7e2019-05-03 08:35:52 -0700460 for f in fs:
461 f.remove_done_callback(_on_completion)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700462
463 done, pending = set(), set()
464 for f in fs:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700465 if f.done():
466 done.add(f)
467 else:
468 pending.add(f)
469 return done, pending
470
471
Miss Islington (bot)d8948c52018-05-29 15:37:06 -0700472async def _cancel_and_wait(fut, loop):
473 """Cancel the *fut* future or task and wait until it completes."""
474
475 waiter = loop.create_future()
476 cb = functools.partial(_release_waiter, waiter)
477 fut.add_done_callback(cb)
478
479 try:
480 fut.cancel()
481 # We cannot wait on *fut* directly to make
482 # sure _cancel_and_wait itself is reliably cancellable.
483 await waiter
484 finally:
485 fut.remove_done_callback(cb)
486
487
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700488# This is *not* a @coroutine! It is just an iterator (yielding Futures).
489def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800490 """Return an iterator whose values are coroutines.
491
492 When waiting for the yielded coroutines you'll get the results (or
493 exceptions!) of the original Futures (or coroutines), in the order
494 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700495
496 This differs from PEP 3148; the proper way to use this is:
497
498 for f in as_completed(fs):
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200499 result = await f # The 'await' may raise.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700500 # Use result.
501
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200502 If a timeout is specified, the 'await' will raise
Guido van Rossumb58f0532014-02-12 17:58:19 -0800503 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700504
505 Note: The futures 'f' are not necessarily members of fs.
506 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700507 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500508 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700509 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400510 todo = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800511 from .queues import Queue # Import here to avoid circular import problem.
512 done = Queue(loop=loop)
513 timeout_handle = None
514
515 def _on_timeout():
516 for f in todo:
517 f.remove_done_callback(_on_completion)
518 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
519 todo.clear() # Can't do todo.remove(f) in the loop.
520
521 def _on_completion(f):
522 if not todo:
523 return # _on_timeout() was here first.
524 todo.remove(f)
525 done.put_nowait(f)
526 if not todo and timeout_handle is not None:
527 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700528
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200529 async def _wait_for_one():
530 f = await done.get()
Guido van Rossumb58f0532014-02-12 17:58:19 -0800531 if f is None:
532 # Dummy value from _on_timeout().
533 raise futures.TimeoutError
534 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700535
Guido van Rossumb58f0532014-02-12 17:58:19 -0800536 for f in todo:
537 f.add_done_callback(_on_completion)
538 if todo and timeout is not None:
539 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700540 for _ in range(len(todo)):
541 yield _wait_for_one()
542
543
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200544@types.coroutine
545def __sleep0():
546 """Skip one event loop run cycle.
547
548 This is a private helper for 'asyncio.sleep()', used
549 when the 'delay' is set to 0. It uses a bare 'yield'
Yury Selivanov22feeb82018-01-24 11:31:01 -0500550 expression (which Task.__step knows how to handle)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200551 instead of creating a Future object.
552 """
553 yield
554
555
556async def sleep(delay, result=None, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700557 """Coroutine that completes after a given time (in seconds)."""
Andrew Svetlov5382c052017-12-17 16:41:30 +0200558 if delay <= 0:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200559 await __sleep0()
Yury Selivanovade04122015-11-05 14:29:04 -0500560 return result
561
Yury Selivanov7661db62016-05-16 15:38:39 -0400562 if loop is None:
563 loop = events.get_event_loop()
564 future = loop.create_future()
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500565 h = loop.call_later(delay,
566 futures._set_result_unless_cancelled,
567 future, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700568 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200569 return await future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700570 finally:
571 h.cancel()
572
573
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400574def ensure_future(coro_or_future, *, loop=None):
Yury Selivanov620279b2015-10-02 15:00:19 -0400575 """Wrap a coroutine or an awaitable in a future.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400576
577 If the argument is a Future, it is returned directly.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700578 """
Miss Islington (bot)f8fdb362018-05-28 11:42:50 -0700579 if coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200580 if loop is None:
581 loop = events.get_event_loop()
582 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200583 if task._source_traceback:
584 del task._source_traceback[-1]
585 return task
Miss Islington (bot)f8fdb362018-05-28 11:42:50 -0700586 elif futures.isfuture(coro_or_future):
587 if loop is not None and loop is not futures._get_loop(coro_or_future):
588 raise ValueError('loop argument must agree with Future')
589 return coro_or_future
Victor Stinner3f438a92017-11-28 14:43:52 +0100590 elif inspect.isawaitable(coro_or_future):
Yury Selivanov620279b2015-10-02 15:00:19 -0400591 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700592 else:
Charles Renwickae5b3262017-04-21 16:49:48 -0400593 raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
594 'required')
Yury Selivanov620279b2015-10-02 15:00:19 -0400595
596
597@coroutine
598def _wrap_awaitable(awaitable):
599 """Helper for asyncio.ensure_future().
600
601 Wraps awaitable (an object with __await__) into a coroutine
602 that will later be wrapped in a Task by ensure_future().
603 """
604 return (yield from awaitable.__await__())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700605
606
607class _GatheringFuture(futures.Future):
608 """Helper for gather().
609
610 This overrides cancel() to cancel all the children and act more
611 like Task.cancel(), which doesn't immediately mark itself as
612 cancelled.
613 """
614
615 def __init__(self, children, *, loop=None):
616 super().__init__(loop=loop)
617 self._children = children
Miss Islington (bot)03643422018-05-29 15:29:12 -0700618 self._cancel_requested = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700619
620 def cancel(self):
621 if self.done():
622 return False
Yury Selivanov3d676152016-10-21 17:22:17 -0400623 ret = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700624 for child in self._children:
Yury Selivanov3d676152016-10-21 17:22:17 -0400625 if child.cancel():
626 ret = True
Miss Islington (bot)03643422018-05-29 15:29:12 -0700627 if ret:
628 # If any child tasks were actually cancelled, we should
629 # propagate the cancellation request regardless of
630 # *return_exceptions* argument. See issue 32684.
631 self._cancel_requested = True
Yury Selivanov3d676152016-10-21 17:22:17 -0400632 return ret
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700633
634
635def gather(*coros_or_futures, loop=None, return_exceptions=False):
Yury Selivanov36c2c042017-12-19 07:19:53 -0500636 """Return a future aggregating results from the given coroutines/futures.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700637
Guido van Rossume3c65a72016-09-30 08:17:15 -0700638 Coroutines will be wrapped in a future and scheduled in the event
639 loop. They will not necessarily be scheduled in the same order as
640 passed in.
641
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700642 All futures must share the same event loop. If all the tasks are
643 done successfully, the returned future's result is the list of
644 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500645 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700646 exceptions in the tasks are treated the same as successful
647 results, and gathered in the result list; otherwise, the first
648 raised exception will be immediately propagated to the returned
649 future.
650
651 Cancellation: if the outer Future is cancelled, all children (that
652 have not completed yet) are also cancelled. If any child is
653 cancelled, this is treated as if it raised CancelledError --
654 the outer Future is *not* cancelled in this case. (This is to
655 prevent the cancellation of one child to cause other children to
656 be cancelled.)
657 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200658 if not coros_or_futures:
Yury Selivanov7661db62016-05-16 15:38:39 -0400659 if loop is None:
660 loop = events.get_event_loop()
661 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700662 outer.set_result([])
663 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200664
Yury Selivanov36c2c042017-12-19 07:19:53 -0500665 def _done_callback(fut):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700666 nonlocal nfinished
Yury Selivanov36c2c042017-12-19 07:19:53 -0500667 nfinished += 1
668
Victor Stinner3531d902015-01-09 01:42:52 +0100669 if outer.done():
670 if not fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700671 # Mark exception retrieved.
672 fut.exception()
673 return
Victor Stinner3531d902015-01-09 01:42:52 +0100674
Yury Selivanov36c2c042017-12-19 07:19:53 -0500675 if not return_exceptions:
676 if fut.cancelled():
677 # Check if 'fut' is cancelled first, as
678 # 'fut.exception()' will *raise* a CancelledError
679 # instead of returning it.
680 exc = futures.CancelledError()
681 outer.set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700682 return
Yury Selivanov36c2c042017-12-19 07:19:53 -0500683 else:
684 exc = fut.exception()
685 if exc is not None:
686 outer.set_exception(exc)
687 return
688
689 if nfinished == nfuts:
690 # All futures are done; create a list of results
691 # and set it to the 'outer' future.
692 results = []
693
694 for fut in children:
695 if fut.cancelled():
696 # Check if 'fut' is cancelled first, as
697 # 'fut.exception()' will *raise* a CancelledError
698 # instead of returning it.
699 res = futures.CancelledError()
700 else:
701 res = fut.exception()
702 if res is None:
703 res = fut.result()
704 results.append(res)
705
Miss Islington (bot)03643422018-05-29 15:29:12 -0700706 if outer._cancel_requested:
707 # If gather is being cancelled we must propagate the
708 # cancellation regardless of *return_exceptions* argument.
709 # See issue 32684.
710 outer.set_exception(futures.CancelledError())
711 else:
712 outer.set_result(results)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700713
Yury Selivanov36c2c042017-12-19 07:19:53 -0500714 arg_to_fut = {}
715 children = []
716 nfuts = 0
717 nfinished = 0
718 for arg in coros_or_futures:
719 if arg not in arg_to_fut:
720 fut = ensure_future(arg, loop=loop)
721 if loop is None:
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500722 loop = futures._get_loop(fut)
Yury Selivanov36c2c042017-12-19 07:19:53 -0500723 if fut is not arg:
724 # 'arg' was not a Future, therefore, 'fut' is a new
725 # Future created specifically for 'arg'. Since the caller
726 # can't control it, disable the "destroy pending task"
727 # warning.
728 fut._log_destroy_pending = False
729
730 nfuts += 1
731 arg_to_fut[arg] = fut
732 fut.add_done_callback(_done_callback)
733
734 else:
735 # There's a duplicate Future object in coros_or_futures.
736 fut = arg_to_fut[arg]
737
738 children.append(fut)
739
740 outer = _GatheringFuture(children, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700741 return outer
742
743
744def shield(arg, *, loop=None):
745 """Wait for a future, shielding it from cancellation.
746
747 The statement
748
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200749 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700750
751 is exactly equivalent to the statement
752
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200753 res = await something()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700754
755 *except* that if the coroutine containing it is cancelled, the
756 task running in something() is not cancelled. From the POV of
757 something(), the cancellation did not happen. But its caller is
758 still cancelled, so the yield-from expression still raises
759 CancelledError. Note: If something() is cancelled by other means
760 this will still cancel shield().
761
762 If you want to completely ignore cancellation (not recommended)
763 you can combine shield() with a try/except clause, as follows:
764
765 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200766 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700767 except CancelledError:
768 res = None
769 """
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400770 inner = ensure_future(arg, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700771 if inner.done():
772 # Shortcut.
773 return inner
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500774 loop = futures._get_loop(inner)
Yury Selivanov7661db62016-05-16 15:38:39 -0400775 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700776
Miss Islington (bot)299f69c2019-05-07 12:38:00 -0700777 def _inner_done_callback(inner):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700778 if outer.cancelled():
Victor Stinner3531d902015-01-09 01:42:52 +0100779 if not inner.cancelled():
780 # Mark inner's result as retrieved.
781 inner.exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700782 return
Victor Stinner3531d902015-01-09 01:42:52 +0100783
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700784 if inner.cancelled():
785 outer.cancel()
786 else:
787 exc = inner.exception()
788 if exc is not None:
789 outer.set_exception(exc)
790 else:
791 outer.set_result(inner.result())
792
Miss Islington (bot)299f69c2019-05-07 12:38:00 -0700793
794 def _outer_done_callback(outer):
795 if not inner.done():
796 inner.remove_done_callback(_inner_done_callback)
797
798 inner.add_done_callback(_inner_done_callback)
799 outer.add_done_callback(_outer_done_callback)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700800 return outer
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700801
802
803def run_coroutine_threadsafe(coro, loop):
804 """Submit a coroutine object to a given event loop.
805
806 Return a concurrent.futures.Future to access the result.
807 """
808 if not coroutines.iscoroutine(coro):
809 raise TypeError('A coroutine object is required')
810 future = concurrent.futures.Future()
811
812 def callback():
Guido van Rossum601953b2015-10-05 16:20:00 -0700813 try:
814 futures._chain_future(ensure_future(coro, loop=loop), future)
815 except Exception as exc:
816 if future.set_running_or_notify_cancel():
817 future.set_exception(exc)
818 raise
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700819
820 loop.call_soon_threadsafe(callback)
821 return future
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200822
823
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500824# WeakSet containing all alive tasks.
825_all_tasks = weakref.WeakSet()
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200826
827# Dictionary containing tasks that are currently active in
828# all running event loops. {EventLoop: Task}
829_current_tasks = {}
830
831
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500832def _register_task(task):
833 """Register a new task in asyncio as executed by loop."""
834 _all_tasks.add(task)
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200835
836
837def _enter_task(loop, task):
838 current_task = _current_tasks.get(loop)
839 if current_task is not None:
840 raise RuntimeError(f"Cannot enter into task {task!r} while another "
841 f"task {current_task!r} is being executed.")
842 _current_tasks[loop] = task
843
844
845def _leave_task(loop, task):
846 current_task = _current_tasks.get(loop)
847 if current_task is not task:
848 raise RuntimeError(f"Leaving task {task!r} does not match "
849 f"the current task {current_task!r}.")
850 del _current_tasks[loop]
851
852
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500853def _unregister_task(task):
854 """Unregister a task."""
855 _all_tasks.discard(task)
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200856
857
858_py_register_task = _register_task
859_py_unregister_task = _unregister_task
860_py_enter_task = _enter_task
861_py_leave_task = _leave_task
862
863
864try:
865 from _asyncio import (_register_task, _unregister_task,
866 _enter_task, _leave_task,
867 _all_tasks, _current_tasks)
868except ImportError:
869 pass
870else:
871 _c_register_task = _register_task
872 _c_unregister_task = _unregister_task
873 _c_enter_task = _enter_task
874 _c_leave_task = _leave_task