blob: 03d71d37f01afd132705d7fe0870c4771e14f7f5 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Yury Selivanov6370f342017-12-10 18:36:12 -05003__all__ = (
Andrew Svetlovf74ef452017-12-15 07:04:38 +02004 'Task', 'create_task',
Yury Selivanov6370f342017-12-10 18:36:12 -05005 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
Yury Selivanov9edad3c2017-12-11 10:03:48 -05006 'wait', 'wait_for', 'as_completed', 'sleep',
Yury Selivanov6370f342017-12-10 18:36:12 -05007 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
Andrew Svetlov44d1a592017-12-16 21:58:38 +02008 'current_task', 'all_tasks',
9 '_register_task', '_unregister_task', '_enter_task', '_leave_task',
Yury Selivanov6370f342017-12-10 18:36:12 -050010)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012import concurrent.futures
Yury Selivanovf23746a2018-01-22 19:11:18 -050013import contextvars
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014import functools
15import inspect
Alex Grönholmcca4eec2018-08-09 00:06:47 +030016import itertools
Andrew Svetlov5f841b52017-12-09 00:23:48 +020017import types
Yury Selivanov59eb9a42015-05-11 14:48:38 -040018import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019import weakref
20
Yury Selivanova0c1ba62016-10-28 12:52:37 -040021from . import base_tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020022from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023from . import events
24from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020025from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026
Alex Grönholmcca4eec2018-08-09 00:06:47 +030027# Helper to generate new task names
28# This uses itertools.count() instead of a "+= 1" operation because the latter
29# is not thread safe. See bpo-11866 for a longer explanation.
30_task_name_counter = itertools.count(1).__next__
31
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070032
Andrew Svetlov44d1a592017-12-16 21:58:38 +020033def current_task(loop=None):
34 """Return a currently executed task."""
35 if loop is None:
36 loop = events.get_running_loop()
37 return _current_tasks.get(loop)
38
39
40def all_tasks(loop=None):
41 """Return a set of all tasks for the loop."""
42 if loop is None:
Yury Selivanov416c1eb2018-05-28 17:54:02 -040043 loop = events.get_running_loop()
44 return {t for t in _all_tasks
45 if futures._get_loop(t) is loop and not t.done()}
46
47
48def _all_tasks_compat(loop=None):
49 # Different from "all_task()" by returning *all* Tasks, including
50 # the completed ones. Used to implement deprecated "Tasks.all_task()"
51 # method.
52 if loop is None:
Andrew Svetlov44d1a592017-12-16 21:58:38 +020053 loop = events.get_event_loop()
Yury Selivanovca9b36c2017-12-23 15:04:15 -050054 return {t for t in _all_tasks if futures._get_loop(t) is loop}
Andrew Svetlov44d1a592017-12-16 21:58:38 +020055
56
Alex Grönholmcca4eec2018-08-09 00:06:47 +030057def _set_task_name(task, name):
58 if name is not None:
59 try:
60 set_name = task.set_name
61 except AttributeError:
62 pass
63 else:
64 set_name(name)
65
66
Yury Selivanov0cf16f92017-12-25 10:48:15 -050067class Task(futures._PyFuture): # Inherit Python Task implementation
68 # from a Python Future implementation.
69
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070070 """A coroutine wrapped in a Future."""
71
72 # An important invariant maintained while a Task not done:
73 #
74 # - Either _fut_waiter is None, and _step() is scheduled;
75 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
76 #
77 # The only transition from the latter to the former is through
78 # _wakeup(). When _fut_waiter is not None, one of its callbacks
79 # must be _wakeup().
80
Victor Stinnerfe22e092014-12-04 23:00:13 +010081 # If False, don't log a message if the task is destroyed whereas its
82 # status is still pending
83 _log_destroy_pending = True
84
Guido van Rossum1a605ed2013-12-06 12:57:40 -080085 @classmethod
86 def current_task(cls, loop=None):
87 """Return the currently running task in an event loop or None.
88
89 By default the current task for the current event loop is returned.
90
91 None is returned when called not in the context of a Task.
92 """
Andrew Svetlov44d1a592017-12-16 21:58:38 +020093 warnings.warn("Task.current_task() is deprecated, "
94 "use asyncio.current_task() instead",
95 PendingDeprecationWarning,
96 stacklevel=2)
Guido van Rossum1a605ed2013-12-06 12:57:40 -080097 if loop is None:
98 loop = events.get_event_loop()
Andrew Svetlov44d1a592017-12-16 21:58:38 +020099 return current_task(loop)
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800100
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700101 @classmethod
102 def all_tasks(cls, loop=None):
103 """Return a set of all tasks for an event loop.
104
105 By default all tasks for the current event loop are returned.
106 """
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200107 warnings.warn("Task.all_tasks() is deprecated, "
108 "use asyncio.all_tasks() instead",
109 PendingDeprecationWarning,
110 stacklevel=2)
Yury Selivanov416c1eb2018-05-28 17:54:02 -0400111 return _all_tasks_compat(loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700112
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300113 def __init__(self, coro, *, loop=None, name=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700114 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200115 if self._source_traceback:
116 del self._source_traceback[-1]
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200117 if not coroutines.iscoroutine(coro):
118 # raise after Future.__init__(), attrs are required for __del__
119 # prevent logging for pending task in __del__
120 self._log_destroy_pending = False
121 raise TypeError(f"a coroutine was expected, got {coro!r}")
122
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300123 if name is None:
124 self._name = f'Task-{_task_name_counter()}'
125 else:
126 self._name = str(name)
127
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700128 self._must_cancel = False
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200129 self._fut_waiter = None
130 self._coro = coro
Yury Selivanovf23746a2018-01-22 19:11:18 -0500131 self._context = contextvars.copy_context()
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200132
Yury Selivanov22feeb82018-01-24 11:31:01 -0500133 self._loop.call_soon(self.__step, context=self._context)
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500134 _register_task(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700135
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900136 def __del__(self):
137 if self._state == futures._PENDING and self._log_destroy_pending:
138 context = {
139 'task': self,
140 'message': 'Task was destroyed but it is pending!',
141 }
142 if self._source_traceback:
143 context['source_traceback'] = self._source_traceback
144 self._loop.call_exception_handler(context)
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500145 super().__del__()
Victor Stinnera02f81f2014-06-24 22:37:53 +0200146
Victor Stinner313a9802014-07-29 12:58:23 +0200147 def _repr_info(self):
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400148 return base_tasks._task_repr_info(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700149
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300150 def get_name(self):
151 return self._name
152
153 def set_name(self, value):
154 self._name = str(value)
155
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500156 def set_result(self, result):
157 raise RuntimeError('Task does not support set_result operation')
158
159 def set_exception(self, exception):
160 raise RuntimeError('Task does not support set_exception operation')
161
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700162 def get_stack(self, *, limit=None):
163 """Return the list of stack frames for this task's coroutine.
164
Victor Stinnerd87de832014-12-02 17:57:04 +0100165 If the coroutine is not done, this returns the stack where it is
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700166 suspended. If the coroutine has completed successfully or was
167 cancelled, this returns an empty list. If the coroutine was
168 terminated by an exception, this returns the list of traceback
169 frames.
170
171 The frames are always ordered from oldest to newest.
172
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500173 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700174 return; by default all available frames are returned. Its
175 meaning differs depending on whether a stack or a traceback is
176 returned: the newest frames of a stack are returned, but the
177 oldest frames of a traceback are returned. (This matches the
178 behavior of the traceback module.)
179
180 For reasons beyond our control, only one stack frame is
181 returned for a suspended coroutine.
182 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400183 return base_tasks._task_get_stack(self, limit)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700184
185 def print_stack(self, *, limit=None, file=None):
186 """Print the stack or traceback for this task's coroutine.
187
188 This produces output similar to that of the traceback module,
189 for the frames retrieved by get_stack(). The limit argument
190 is passed to get_stack(). The file argument is an I/O stream
R David Murray8e069d52014-09-24 13:13:45 -0400191 to which the output is written; by default output is written
192 to sys.stderr.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700193 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400194 return base_tasks._task_print_stack(self, limit, file)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700195
196 def cancel(self):
R David Murray8e069d52014-09-24 13:13:45 -0400197 """Request that this task cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200198
Victor Stinner8d213572014-06-02 23:06:46 +0200199 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200200 wrapped coroutine on the next cycle through the event loop.
201 The coroutine then has a chance to clean up or even deny
202 the request using try/except/finally.
203
R David Murray8e069d52014-09-24 13:13:45 -0400204 Unlike Future.cancel, this does not guarantee that the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200205 task will be cancelled: the exception might be caught and
R David Murray8e069d52014-09-24 13:13:45 -0400206 acted upon, delaying cancellation of the task or preventing
207 cancellation completely. The task may also return a value or
208 raise a different exception.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200209
210 Immediately after this method is called, Task.cancelled() will
211 not return True (unless the task was already cancelled). A
212 task will be marked as cancelled when the wrapped coroutine
213 terminates with a CancelledError exception (even if cancel()
214 was not called).
215 """
Yury Selivanov7ce1c6f2017-06-11 13:49:18 +0000216 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700217 if self.done():
218 return False
219 if self._fut_waiter is not None:
220 if self._fut_waiter.cancel():
221 # Leave self._fut_waiter; it may be a Task that
222 # catches and ignores the cancellation so we may have
223 # to cancel it again later.
224 return True
Yury Selivanov22feeb82018-01-24 11:31:01 -0500225 # It must be the case that self.__step is already scheduled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700226 self._must_cancel = True
227 return True
228
Yury Selivanov22feeb82018-01-24 11:31:01 -0500229 def __step(self, exc=None):
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500230 if self.done():
231 raise futures.InvalidStateError(
232 f'_step(): already done: {self!r}, {exc!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700233 if self._must_cancel:
234 if not isinstance(exc, futures.CancelledError):
235 exc = futures.CancelledError()
236 self._must_cancel = False
237 coro = self._coro
238 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800239
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200240 _enter_task(self._loop, self)
Yury Selivanovd59bba82015-11-20 12:41:03 -0500241 # Call either coro.throw(exc) or coro.send(None).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700242 try:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500243 if exc is None:
244 # We use the `send` method directly, because coroutines
245 # don't have `__iter__` and `__next__` methods.
246 result = coro.send(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700247 else:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500248 result = coro.throw(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700249 except StopIteration as exc:
INADA Naoki991adca2017-05-11 21:18:38 +0900250 if self._must_cancel:
251 # Task is cancelled right before coro stops.
252 self._must_cancel = False
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500253 super().set_exception(futures.CancelledError())
INADA Naoki991adca2017-05-11 21:18:38 +0900254 else:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500255 super().set_result(exc.value)
Yury Selivanov4145c832016-10-09 12:19:12 -0400256 except futures.CancelledError:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700257 super().cancel() # I.e., Future.cancel(self).
258 except Exception as exc:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500259 super().set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700260 except BaseException as exc:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500261 super().set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700262 raise
263 else:
Guido van Rossum1140a032016-09-09 12:54:54 -0700264 blocking = getattr(result, '_asyncio_future_blocking', None)
265 if blocking is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700266 # Yielded Future must come from Future.__iter__().
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500267 if futures._get_loop(result) is not self._loop:
Yury Selivanov6370f342017-12-10 18:36:12 -0500268 new_exc = RuntimeError(
269 f'Task {self!r} got Future '
270 f'{result!r} attached to a different loop')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500271 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500272 self.__step, new_exc, context=self._context)
Guido van Rossum1140a032016-09-09 12:54:54 -0700273 elif blocking:
Yury Selivanov4145c832016-10-09 12:19:12 -0400274 if result is self:
Yury Selivanov6370f342017-12-10 18:36:12 -0500275 new_exc = RuntimeError(
276 f'Task cannot await on itself: {self!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500277 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500278 self.__step, new_exc, context=self._context)
Yury Selivanov4145c832016-10-09 12:19:12 -0400279 else:
280 result._asyncio_future_blocking = False
Yury Selivanovf23746a2018-01-22 19:11:18 -0500281 result.add_done_callback(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500282 self.__wakeup, context=self._context)
Yury Selivanov4145c832016-10-09 12:19:12 -0400283 self._fut_waiter = result
284 if self._must_cancel:
285 if self._fut_waiter.cancel():
286 self._must_cancel = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700287 else:
Yury Selivanov6370f342017-12-10 18:36:12 -0500288 new_exc = RuntimeError(
289 f'yield was used instead of yield from '
290 f'in task {self!r} with {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500291 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500292 self.__step, new_exc, context=self._context)
Yury Selivanov6370f342017-12-10 18:36:12 -0500293
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700294 elif result is None:
295 # Bare yield relinquishes control for one event loop iteration.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500296 self._loop.call_soon(self.__step, context=self._context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700297 elif inspect.isgenerator(result):
298 # Yielding a generator is just wrong.
Yury Selivanov6370f342017-12-10 18:36:12 -0500299 new_exc = RuntimeError(
300 f'yield was used instead of yield from for '
Serhiy Storchaka66553542018-05-20 16:30:31 +0300301 f'generator in task {self!r} with {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500302 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500303 self.__step, new_exc, context=self._context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700304 else:
305 # Yielding something else is an error.
Yury Selivanov6370f342017-12-10 18:36:12 -0500306 new_exc = RuntimeError(f'Task got bad yield: {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500307 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500308 self.__step, new_exc, context=self._context)
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800309 finally:
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200310 _leave_task(self._loop, self)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100311 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700312
Yury Selivanov22feeb82018-01-24 11:31:01 -0500313 def __wakeup(self, future):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700314 try:
Yury Selivanova4afc482015-11-16 15:12:10 -0500315 future.result()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700316 except Exception as exc:
317 # This may also be a cancellation.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500318 self.__step(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700319 else:
Yury Selivanova4afc482015-11-16 15:12:10 -0500320 # Don't pass the value of `future.result()` explicitly,
321 # as `Future.__iter__` and `Future.__await__` don't need it.
322 # If we call `_step(value, None)` instead of `_step()`,
323 # Python eval loop would use `.send(value)` method call,
324 # instead of `__next__()`, which is slower for futures
325 # that return non-generator iterators from their `__iter__`.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500326 self.__step()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700327 self = None # Needed to break cycles when an exception occurs.
328
329
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400330_PyTask = Task
331
332
333try:
334 import _asyncio
335except ImportError:
336 pass
337else:
338 # _CTask is needed for tests.
339 Task = _CTask = _asyncio.Task
340
341
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300342def create_task(coro, *, name=None):
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200343 """Schedule the execution of a coroutine object in a spawn task.
344
345 Return a Task object.
346 """
347 loop = events.get_running_loop()
Alex Grönholmcca4eec2018-08-09 00:06:47 +0300348 task = loop.create_task(coro)
349 _set_task_name(task, name)
350 return task
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200351
352
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700353# wait() and as_completed() similar to those in PEP 3148.
354
355FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
356FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
357ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
358
359
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200360async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700361 """Wait for the Futures and coroutines given by fs to complete.
362
Victor Stinnerdb74d982014-06-10 11:16:05 +0200363 The sequence futures must not be empty.
364
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700365 Coroutines will be wrapped in Tasks.
366
367 Returns two sets of Future: (done, pending).
368
369 Usage:
370
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200371 done, pending = await asyncio.wait(fs)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700372
373 Note: This does not raise TimeoutError! Futures that aren't done
374 when the timeout occurs are returned in the second set.
375 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700376 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500377 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700378 if not fs:
379 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200380 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
Yury Selivanov6370f342017-12-10 18:36:12 -0500381 raise ValueError(f'Invalid return_when value: {return_when}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700382
383 if loop is None:
384 loop = events.get_event_loop()
385
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400386 fs = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700387
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200388 return await _wait(fs, timeout, return_when, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700389
390
Victor Stinner59e08022014-08-28 11:19:25 +0200391def _release_waiter(waiter, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700392 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200393 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700394
395
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200396async def wait_for(fut, timeout, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700397 """Wait for the single Future or coroutine to complete, with timeout.
398
399 Coroutine will be wrapped in Task.
400
Victor Stinner421e49b2014-01-23 17:40:59 +0100401 Returns result of the Future or coroutine. When a timeout occurs,
402 it cancels the task and raises TimeoutError. To avoid the task
403 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700404
Victor Stinner922bc2c2015-01-15 16:29:10 +0100405 If the wait is cancelled, the task is also cancelled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700406
Victor Stinner922bc2c2015-01-15 16:29:10 +0100407 This function is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700408 """
409 if loop is None:
410 loop = events.get_event_loop()
411
Guido van Rossum48c66c32014-01-29 14:30:38 -0800412 if timeout is None:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200413 return await fut
Guido van Rossum48c66c32014-01-29 14:30:38 -0800414
Victor K4d071892017-10-05 19:04:39 +0300415 if timeout <= 0:
416 fut = ensure_future(fut, loop=loop)
417
418 if fut.done():
419 return fut.result()
420
421 fut.cancel()
422 raise futures.TimeoutError()
423
Yury Selivanov7661db62016-05-16 15:38:39 -0400424 waiter = loop.create_future()
Victor Stinner59e08022014-08-28 11:19:25 +0200425 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
426 cb = functools.partial(_release_waiter, waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700427
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400428 fut = ensure_future(fut, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700429 fut.add_done_callback(cb)
430
431 try:
Victor Stinner59e08022014-08-28 11:19:25 +0200432 # wait until the future completes or the timeout
Victor Stinner922bc2c2015-01-15 16:29:10 +0100433 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200434 await waiter
Victor Stinner922bc2c2015-01-15 16:29:10 +0100435 except futures.CancelledError:
436 fut.remove_done_callback(cb)
437 fut.cancel()
438 raise
Victor Stinner59e08022014-08-28 11:19:25 +0200439
440 if fut.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700441 return fut.result()
442 else:
443 fut.remove_done_callback(cb)
Elvis Pranskevichuse2b340a2018-05-29 17:31:01 -0400444 # We must ensure that the task is not running
445 # after wait_for() returns.
446 # See https://bugs.python.org/issue32751
447 await _cancel_and_wait(fut, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700448 raise futures.TimeoutError()
449 finally:
450 timeout_handle.cancel()
451
452
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200453async def _wait(fs, timeout, return_when, loop):
Elvis Pranskevichuse2b340a2018-05-29 17:31:01 -0400454 """Internal helper for wait().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700455
456 The fs argument must be a collection of Futures.
457 """
458 assert fs, 'Set of Futures is empty.'
Yury Selivanov7661db62016-05-16 15:38:39 -0400459 waiter = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700460 timeout_handle = None
461 if timeout is not None:
462 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
463 counter = len(fs)
464
465 def _on_completion(f):
466 nonlocal counter
467 counter -= 1
468 if (counter <= 0 or
469 return_when == FIRST_COMPLETED or
470 return_when == FIRST_EXCEPTION and (not f.cancelled() and
471 f.exception() is not None)):
472 if timeout_handle is not None:
473 timeout_handle.cancel()
474 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200475 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700476
477 for f in fs:
478 f.add_done_callback(_on_completion)
479
480 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200481 await waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700482 finally:
483 if timeout_handle is not None:
484 timeout_handle.cancel()
485
486 done, pending = set(), set()
487 for f in fs:
488 f.remove_done_callback(_on_completion)
489 if f.done():
490 done.add(f)
491 else:
492 pending.add(f)
493 return done, pending
494
495
Elvis Pranskevichuse2b340a2018-05-29 17:31:01 -0400496async def _cancel_and_wait(fut, loop):
497 """Cancel the *fut* future or task and wait until it completes."""
498
499 waiter = loop.create_future()
500 cb = functools.partial(_release_waiter, waiter)
501 fut.add_done_callback(cb)
502
503 try:
504 fut.cancel()
505 # We cannot wait on *fut* directly to make
506 # sure _cancel_and_wait itself is reliably cancellable.
507 await waiter
508 finally:
509 fut.remove_done_callback(cb)
510
511
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700512# This is *not* a @coroutine! It is just an iterator (yielding Futures).
513def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800514 """Return an iterator whose values are coroutines.
515
516 When waiting for the yielded coroutines you'll get the results (or
517 exceptions!) of the original Futures (or coroutines), in the order
518 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700519
520 This differs from PEP 3148; the proper way to use this is:
521
522 for f in as_completed(fs):
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200523 result = await f # The 'await' may raise.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700524 # Use result.
525
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200526 If a timeout is specified, the 'await' will raise
Guido van Rossumb58f0532014-02-12 17:58:19 -0800527 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700528
529 Note: The futures 'f' are not necessarily members of fs.
530 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700531 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500532 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700533 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400534 todo = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800535 from .queues import Queue # Import here to avoid circular import problem.
536 done = Queue(loop=loop)
537 timeout_handle = None
538
539 def _on_timeout():
540 for f in todo:
541 f.remove_done_callback(_on_completion)
542 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
543 todo.clear() # Can't do todo.remove(f) in the loop.
544
545 def _on_completion(f):
546 if not todo:
547 return # _on_timeout() was here first.
548 todo.remove(f)
549 done.put_nowait(f)
550 if not todo and timeout_handle is not None:
551 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700552
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200553 async def _wait_for_one():
554 f = await done.get()
Guido van Rossumb58f0532014-02-12 17:58:19 -0800555 if f is None:
556 # Dummy value from _on_timeout().
557 raise futures.TimeoutError
558 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700559
Guido van Rossumb58f0532014-02-12 17:58:19 -0800560 for f in todo:
561 f.add_done_callback(_on_completion)
562 if todo and timeout is not None:
563 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700564 for _ in range(len(todo)):
565 yield _wait_for_one()
566
567
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200568@types.coroutine
569def __sleep0():
570 """Skip one event loop run cycle.
571
572 This is a private helper for 'asyncio.sleep()', used
573 when the 'delay' is set to 0. It uses a bare 'yield'
Yury Selivanov22feeb82018-01-24 11:31:01 -0500574 expression (which Task.__step knows how to handle)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200575 instead of creating a Future object.
576 """
577 yield
578
579
580async def sleep(delay, result=None, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700581 """Coroutine that completes after a given time (in seconds)."""
Andrew Svetlov5382c052017-12-17 16:41:30 +0200582 if delay <= 0:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200583 await __sleep0()
Yury Selivanovade04122015-11-05 14:29:04 -0500584 return result
585
Yury Selivanov7661db62016-05-16 15:38:39 -0400586 if loop is None:
587 loop = events.get_event_loop()
588 future = loop.create_future()
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500589 h = loop.call_later(delay,
590 futures._set_result_unless_cancelled,
591 future, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700592 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200593 return await future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700594 finally:
595 h.cancel()
596
597
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400598def ensure_future(coro_or_future, *, loop=None):
Yury Selivanov620279b2015-10-02 15:00:19 -0400599 """Wrap a coroutine or an awaitable in a future.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400600
601 If the argument is a Future, it is returned directly.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700602 """
jimmylaie549c4b2018-05-28 06:42:05 -1000603 if coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200604 if loop is None:
605 loop = events.get_event_loop()
606 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200607 if task._source_traceback:
608 del task._source_traceback[-1]
609 return task
jimmylaie549c4b2018-05-28 06:42:05 -1000610 elif futures.isfuture(coro_or_future):
611 if loop is not None and loop is not futures._get_loop(coro_or_future):
612 raise ValueError('loop argument must agree with Future')
613 return coro_or_future
Victor Stinner3f438a92017-11-28 14:43:52 +0100614 elif inspect.isawaitable(coro_or_future):
Yury Selivanov620279b2015-10-02 15:00:19 -0400615 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700616 else:
Charles Renwickae5b3262017-04-21 16:49:48 -0400617 raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
618 'required')
Yury Selivanov620279b2015-10-02 15:00:19 -0400619
620
621@coroutine
622def _wrap_awaitable(awaitable):
623 """Helper for asyncio.ensure_future().
624
625 Wraps awaitable (an object with __await__) into a coroutine
626 that will later be wrapped in a Task by ensure_future().
627 """
628 return (yield from awaitable.__await__())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700629
630
631class _GatheringFuture(futures.Future):
632 """Helper for gather().
633
634 This overrides cancel() to cancel all the children and act more
635 like Task.cancel(), which doesn't immediately mark itself as
636 cancelled.
637 """
638
639 def __init__(self, children, *, loop=None):
640 super().__init__(loop=loop)
641 self._children = children
Yury Selivanov863b6742018-05-29 17:20:02 -0400642 self._cancel_requested = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700643
644 def cancel(self):
645 if self.done():
646 return False
Yury Selivanov3d676152016-10-21 17:22:17 -0400647 ret = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700648 for child in self._children:
Yury Selivanov3d676152016-10-21 17:22:17 -0400649 if child.cancel():
650 ret = True
Yury Selivanov863b6742018-05-29 17:20:02 -0400651 if ret:
652 # If any child tasks were actually cancelled, we should
653 # propagate the cancellation request regardless of
654 # *return_exceptions* argument. See issue 32684.
655 self._cancel_requested = True
Yury Selivanov3d676152016-10-21 17:22:17 -0400656 return ret
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700657
658
659def gather(*coros_or_futures, loop=None, return_exceptions=False):
Yury Selivanov36c2c042017-12-19 07:19:53 -0500660 """Return a future aggregating results from the given coroutines/futures.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700661
Guido van Rossume3c65a72016-09-30 08:17:15 -0700662 Coroutines will be wrapped in a future and scheduled in the event
663 loop. They will not necessarily be scheduled in the same order as
664 passed in.
665
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700666 All futures must share the same event loop. If all the tasks are
667 done successfully, the returned future's result is the list of
668 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500669 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700670 exceptions in the tasks are treated the same as successful
671 results, and gathered in the result list; otherwise, the first
672 raised exception will be immediately propagated to the returned
673 future.
674
675 Cancellation: if the outer Future is cancelled, all children (that
676 have not completed yet) are also cancelled. If any child is
677 cancelled, this is treated as if it raised CancelledError --
678 the outer Future is *not* cancelled in this case. (This is to
679 prevent the cancellation of one child to cause other children to
680 be cancelled.)
681 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200682 if not coros_or_futures:
Yury Selivanov7661db62016-05-16 15:38:39 -0400683 if loop is None:
684 loop = events.get_event_loop()
685 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700686 outer.set_result([])
687 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200688
Yury Selivanov36c2c042017-12-19 07:19:53 -0500689 def _done_callback(fut):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700690 nonlocal nfinished
Yury Selivanov36c2c042017-12-19 07:19:53 -0500691 nfinished += 1
692
Victor Stinner3531d902015-01-09 01:42:52 +0100693 if outer.done():
694 if not fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700695 # Mark exception retrieved.
696 fut.exception()
697 return
Victor Stinner3531d902015-01-09 01:42:52 +0100698
Yury Selivanov36c2c042017-12-19 07:19:53 -0500699 if not return_exceptions:
700 if fut.cancelled():
701 # Check if 'fut' is cancelled first, as
702 # 'fut.exception()' will *raise* a CancelledError
703 # instead of returning it.
704 exc = futures.CancelledError()
705 outer.set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700706 return
Yury Selivanov36c2c042017-12-19 07:19:53 -0500707 else:
708 exc = fut.exception()
709 if exc is not None:
710 outer.set_exception(exc)
711 return
712
713 if nfinished == nfuts:
714 # All futures are done; create a list of results
715 # and set it to the 'outer' future.
716 results = []
717
718 for fut in children:
719 if fut.cancelled():
720 # Check if 'fut' is cancelled first, as
721 # 'fut.exception()' will *raise* a CancelledError
722 # instead of returning it.
723 res = futures.CancelledError()
724 else:
725 res = fut.exception()
726 if res is None:
727 res = fut.result()
728 results.append(res)
729
Yury Selivanov863b6742018-05-29 17:20:02 -0400730 if outer._cancel_requested:
731 # If gather is being cancelled we must propagate the
732 # cancellation regardless of *return_exceptions* argument.
733 # See issue 32684.
734 outer.set_exception(futures.CancelledError())
735 else:
736 outer.set_result(results)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700737
Yury Selivanov36c2c042017-12-19 07:19:53 -0500738 arg_to_fut = {}
739 children = []
740 nfuts = 0
741 nfinished = 0
742 for arg in coros_or_futures:
743 if arg not in arg_to_fut:
744 fut = ensure_future(arg, loop=loop)
745 if loop is None:
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500746 loop = futures._get_loop(fut)
Yury Selivanov36c2c042017-12-19 07:19:53 -0500747 if fut is not arg:
748 # 'arg' was not a Future, therefore, 'fut' is a new
749 # Future created specifically for 'arg'. Since the caller
750 # can't control it, disable the "destroy pending task"
751 # warning.
752 fut._log_destroy_pending = False
753
754 nfuts += 1
755 arg_to_fut[arg] = fut
756 fut.add_done_callback(_done_callback)
757
758 else:
759 # There's a duplicate Future object in coros_or_futures.
760 fut = arg_to_fut[arg]
761
762 children.append(fut)
763
764 outer = _GatheringFuture(children, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700765 return outer
766
767
768def shield(arg, *, loop=None):
769 """Wait for a future, shielding it from cancellation.
770
771 The statement
772
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200773 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700774
775 is exactly equivalent to the statement
776
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200777 res = await something()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700778
779 *except* that if the coroutine containing it is cancelled, the
780 task running in something() is not cancelled. From the POV of
781 something(), the cancellation did not happen. But its caller is
782 still cancelled, so the yield-from expression still raises
783 CancelledError. Note: If something() is cancelled by other means
784 this will still cancel shield().
785
786 If you want to completely ignore cancellation (not recommended)
787 you can combine shield() with a try/except clause, as follows:
788
789 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200790 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700791 except CancelledError:
792 res = None
793 """
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400794 inner = ensure_future(arg, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700795 if inner.done():
796 # Shortcut.
797 return inner
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500798 loop = futures._get_loop(inner)
Yury Selivanov7661db62016-05-16 15:38:39 -0400799 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700800
801 def _done_callback(inner):
802 if outer.cancelled():
Victor Stinner3531d902015-01-09 01:42:52 +0100803 if not inner.cancelled():
804 # Mark inner's result as retrieved.
805 inner.exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700806 return
Victor Stinner3531d902015-01-09 01:42:52 +0100807
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700808 if inner.cancelled():
809 outer.cancel()
810 else:
811 exc = inner.exception()
812 if exc is not None:
813 outer.set_exception(exc)
814 else:
815 outer.set_result(inner.result())
816
817 inner.add_done_callback(_done_callback)
818 return outer
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700819
820
821def run_coroutine_threadsafe(coro, loop):
822 """Submit a coroutine object to a given event loop.
823
824 Return a concurrent.futures.Future to access the result.
825 """
826 if not coroutines.iscoroutine(coro):
827 raise TypeError('A coroutine object is required')
828 future = concurrent.futures.Future()
829
830 def callback():
Guido van Rossum601953b2015-10-05 16:20:00 -0700831 try:
832 futures._chain_future(ensure_future(coro, loop=loop), future)
833 except Exception as exc:
834 if future.set_running_or_notify_cancel():
835 future.set_exception(exc)
836 raise
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700837
838 loop.call_soon_threadsafe(callback)
839 return future
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200840
841
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500842# WeakSet containing all alive tasks.
843_all_tasks = weakref.WeakSet()
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200844
845# Dictionary containing tasks that are currently active in
846# all running event loops. {EventLoop: Task}
847_current_tasks = {}
848
849
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500850def _register_task(task):
851 """Register a new task in asyncio as executed by loop."""
852 _all_tasks.add(task)
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200853
854
855def _enter_task(loop, task):
856 current_task = _current_tasks.get(loop)
857 if current_task is not None:
858 raise RuntimeError(f"Cannot enter into task {task!r} while another "
859 f"task {current_task!r} is being executed.")
860 _current_tasks[loop] = task
861
862
863def _leave_task(loop, task):
864 current_task = _current_tasks.get(loop)
865 if current_task is not task:
866 raise RuntimeError(f"Leaving task {task!r} does not match "
867 f"the current task {current_task!r}.")
868 del _current_tasks[loop]
869
870
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500871def _unregister_task(task):
872 """Unregister a task."""
873 _all_tasks.discard(task)
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200874
875
876_py_register_task = _register_task
877_py_unregister_task = _unregister_task
878_py_enter_task = _enter_task
879_py_leave_task = _leave_task
880
881
882try:
883 from _asyncio import (_register_task, _unregister_task,
884 _enter_task, _leave_task,
885 _all_tasks, _current_tasks)
886except ImportError:
887 pass
888else:
889 _c_register_task = _register_task
890 _c_unregister_task = _unregister_task
891 _c_enter_task = _enter_task
892 _c_leave_task = _leave_task