blob: 67fb57c6a78159e6ba49e1f2f2a661d88f9cd9bf [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Yury Selivanov6370f342017-12-10 18:36:12 -05003__all__ = (
Andrew Svetlovf74ef452017-12-15 07:04:38 +02004 'Task', 'create_task',
Yury Selivanov6370f342017-12-10 18:36:12 -05005 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
Yury Selivanov9edad3c2017-12-11 10:03:48 -05006 'wait', 'wait_for', 'as_completed', 'sleep',
Yury Selivanov6370f342017-12-10 18:36:12 -05007 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
Andrew Svetlov44d1a592017-12-16 21:58:38 +02008 'current_task', 'all_tasks',
9 '_register_task', '_unregister_task', '_enter_task', '_leave_task',
Yury Selivanov6370f342017-12-10 18:36:12 -050010)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012import concurrent.futures
Yury Selivanovf23746a2018-01-22 19:11:18 -050013import contextvars
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014import functools
15import inspect
Andrew Svetlov5f841b52017-12-09 00:23:48 +020016import types
Yury Selivanov59eb9a42015-05-11 14:48:38 -040017import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018import weakref
19
Yury Selivanova0c1ba62016-10-28 12:52:37 -040020from . import base_tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020021from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022from . import events
23from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020024from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026
Andrew Svetlov44d1a592017-12-16 21:58:38 +020027def current_task(loop=None):
28 """Return a currently executed task."""
29 if loop is None:
30 loop = events.get_running_loop()
31 return _current_tasks.get(loop)
32
33
34def all_tasks(loop=None):
35 """Return a set of all tasks for the loop."""
36 if loop is None:
Miss Islington (bot)ddc613f2018-05-28 17:16:43 -070037 loop = events.get_running_loop()
38 return {t for t in _all_tasks
39 if futures._get_loop(t) is loop and not t.done()}
40
41
42def _all_tasks_compat(loop=None):
43 # Different from "all_task()" by returning *all* Tasks, including
44 # the completed ones. Used to implement deprecated "Tasks.all_task()"
45 # method.
46 if loop is None:
Andrew Svetlov44d1a592017-12-16 21:58:38 +020047 loop = events.get_event_loop()
Yury Selivanovca9b36c2017-12-23 15:04:15 -050048 return {t for t in _all_tasks if futures._get_loop(t) is loop}
Andrew Svetlov44d1a592017-12-16 21:58:38 +020049
50
Yury Selivanov0cf16f92017-12-25 10:48:15 -050051class Task(futures._PyFuture): # Inherit Python Task implementation
52 # from a Python Future implementation.
53
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070054 """A coroutine wrapped in a Future."""
55
56 # An important invariant maintained while a Task not done:
57 #
58 # - Either _fut_waiter is None, and _step() is scheduled;
59 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
60 #
61 # The only transition from the latter to the former is through
62 # _wakeup(). When _fut_waiter is not None, one of its callbacks
63 # must be _wakeup().
64
Victor Stinnerfe22e092014-12-04 23:00:13 +010065 # If False, don't log a message if the task is destroyed whereas its
66 # status is still pending
67 _log_destroy_pending = True
68
Guido van Rossum1a605ed2013-12-06 12:57:40 -080069 @classmethod
70 def current_task(cls, loop=None):
71 """Return the currently running task in an event loop or None.
72
73 By default the current task for the current event loop is returned.
74
75 None is returned when called not in the context of a Task.
76 """
Andrew Svetlov44d1a592017-12-16 21:58:38 +020077 warnings.warn("Task.current_task() is deprecated, "
78 "use asyncio.current_task() instead",
79 PendingDeprecationWarning,
80 stacklevel=2)
Guido van Rossum1a605ed2013-12-06 12:57:40 -080081 if loop is None:
82 loop = events.get_event_loop()
Andrew Svetlov44d1a592017-12-16 21:58:38 +020083 return current_task(loop)
Guido van Rossum1a605ed2013-12-06 12:57:40 -080084
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070085 @classmethod
86 def all_tasks(cls, loop=None):
87 """Return a set of all tasks for an event loop.
88
89 By default all tasks for the current event loop are returned.
90 """
Andrew Svetlov44d1a592017-12-16 21:58:38 +020091 warnings.warn("Task.all_tasks() is deprecated, "
92 "use asyncio.all_tasks() instead",
93 PendingDeprecationWarning,
94 stacklevel=2)
Miss Islington (bot)ddc613f2018-05-28 17:16:43 -070095 return _all_tasks_compat(loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070096
97 def __init__(self, coro, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070098 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +020099 if self._source_traceback:
100 del self._source_traceback[-1]
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200101 if not coroutines.iscoroutine(coro):
102 # raise after Future.__init__(), attrs are required for __del__
103 # prevent logging for pending task in __del__
104 self._log_destroy_pending = False
105 raise TypeError(f"a coroutine was expected, got {coro!r}")
106
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700107 self._must_cancel = False
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200108 self._fut_waiter = None
109 self._coro = coro
Yury Selivanovf23746a2018-01-22 19:11:18 -0500110 self._context = contextvars.copy_context()
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200111
Yury Selivanov22feeb82018-01-24 11:31:01 -0500112 self._loop.call_soon(self.__step, context=self._context)
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500113 _register_task(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700114
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900115 def __del__(self):
116 if self._state == futures._PENDING and self._log_destroy_pending:
117 context = {
118 'task': self,
119 'message': 'Task was destroyed but it is pending!',
120 }
121 if self._source_traceback:
122 context['source_traceback'] = self._source_traceback
123 self._loop.call_exception_handler(context)
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500124 super().__del__()
Victor Stinnera02f81f2014-06-24 22:37:53 +0200125
Victor Stinner313a9802014-07-29 12:58:23 +0200126 def _repr_info(self):
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400127 return base_tasks._task_repr_info(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700128
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500129 def set_result(self, result):
130 raise RuntimeError('Task does not support set_result operation')
131
132 def set_exception(self, exception):
133 raise RuntimeError('Task does not support set_exception operation')
134
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700135 def get_stack(self, *, limit=None):
136 """Return the list of stack frames for this task's coroutine.
137
Victor Stinnerd87de832014-12-02 17:57:04 +0100138 If the coroutine is not done, this returns the stack where it is
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700139 suspended. If the coroutine has completed successfully or was
140 cancelled, this returns an empty list. If the coroutine was
141 terminated by an exception, this returns the list of traceback
142 frames.
143
144 The frames are always ordered from oldest to newest.
145
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500146 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700147 return; by default all available frames are returned. Its
148 meaning differs depending on whether a stack or a traceback is
149 returned: the newest frames of a stack are returned, but the
150 oldest frames of a traceback are returned. (This matches the
151 behavior of the traceback module.)
152
153 For reasons beyond our control, only one stack frame is
154 returned for a suspended coroutine.
155 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400156 return base_tasks._task_get_stack(self, limit)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700157
158 def print_stack(self, *, limit=None, file=None):
159 """Print the stack or traceback for this task's coroutine.
160
161 This produces output similar to that of the traceback module,
162 for the frames retrieved by get_stack(). The limit argument
163 is passed to get_stack(). The file argument is an I/O stream
R David Murray8e069d52014-09-24 13:13:45 -0400164 to which the output is written; by default output is written
165 to sys.stderr.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700166 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400167 return base_tasks._task_print_stack(self, limit, file)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700168
169 def cancel(self):
R David Murray8e069d52014-09-24 13:13:45 -0400170 """Request that this task cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200171
Victor Stinner8d213572014-06-02 23:06:46 +0200172 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200173 wrapped coroutine on the next cycle through the event loop.
174 The coroutine then has a chance to clean up or even deny
175 the request using try/except/finally.
176
R David Murray8e069d52014-09-24 13:13:45 -0400177 Unlike Future.cancel, this does not guarantee that the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200178 task will be cancelled: the exception might be caught and
R David Murray8e069d52014-09-24 13:13:45 -0400179 acted upon, delaying cancellation of the task or preventing
180 cancellation completely. The task may also return a value or
181 raise a different exception.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200182
183 Immediately after this method is called, Task.cancelled() will
184 not return True (unless the task was already cancelled). A
185 task will be marked as cancelled when the wrapped coroutine
186 terminates with a CancelledError exception (even if cancel()
187 was not called).
188 """
Yury Selivanov7ce1c6f2017-06-11 13:49:18 +0000189 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700190 if self.done():
191 return False
192 if self._fut_waiter is not None:
193 if self._fut_waiter.cancel():
194 # Leave self._fut_waiter; it may be a Task that
195 # catches and ignores the cancellation so we may have
196 # to cancel it again later.
197 return True
Yury Selivanov22feeb82018-01-24 11:31:01 -0500198 # It must be the case that self.__step is already scheduled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700199 self._must_cancel = True
200 return True
201
Yury Selivanov22feeb82018-01-24 11:31:01 -0500202 def __step(self, exc=None):
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500203 if self.done():
204 raise futures.InvalidStateError(
205 f'_step(): already done: {self!r}, {exc!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700206 if self._must_cancel:
207 if not isinstance(exc, futures.CancelledError):
208 exc = futures.CancelledError()
209 self._must_cancel = False
210 coro = self._coro
211 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800212
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200213 _enter_task(self._loop, self)
Yury Selivanovd59bba82015-11-20 12:41:03 -0500214 # Call either coro.throw(exc) or coro.send(None).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700215 try:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500216 if exc is None:
217 # We use the `send` method directly, because coroutines
218 # don't have `__iter__` and `__next__` methods.
219 result = coro.send(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700220 else:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500221 result = coro.throw(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700222 except StopIteration as exc:
INADA Naoki991adca2017-05-11 21:18:38 +0900223 if self._must_cancel:
224 # Task is cancelled right before coro stops.
225 self._must_cancel = False
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500226 super().set_exception(futures.CancelledError())
INADA Naoki991adca2017-05-11 21:18:38 +0900227 else:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500228 super().set_result(exc.value)
Yury Selivanov4145c832016-10-09 12:19:12 -0400229 except futures.CancelledError:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700230 super().cancel() # I.e., Future.cancel(self).
231 except Exception as exc:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500232 super().set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700233 except BaseException as exc:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500234 super().set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700235 raise
236 else:
Guido van Rossum1140a032016-09-09 12:54:54 -0700237 blocking = getattr(result, '_asyncio_future_blocking', None)
238 if blocking is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700239 # Yielded Future must come from Future.__iter__().
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500240 if futures._get_loop(result) is not self._loop:
Yury Selivanov6370f342017-12-10 18:36:12 -0500241 new_exc = RuntimeError(
242 f'Task {self!r} got Future '
243 f'{result!r} attached to a different loop')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500244 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500245 self.__step, new_exc, context=self._context)
Guido van Rossum1140a032016-09-09 12:54:54 -0700246 elif blocking:
Yury Selivanov4145c832016-10-09 12:19:12 -0400247 if result is self:
Yury Selivanov6370f342017-12-10 18:36:12 -0500248 new_exc = RuntimeError(
249 f'Task cannot await on itself: {self!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500250 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500251 self.__step, new_exc, context=self._context)
Yury Selivanov4145c832016-10-09 12:19:12 -0400252 else:
253 result._asyncio_future_blocking = False
Yury Selivanovf23746a2018-01-22 19:11:18 -0500254 result.add_done_callback(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500255 self.__wakeup, context=self._context)
Yury Selivanov4145c832016-10-09 12:19:12 -0400256 self._fut_waiter = result
257 if self._must_cancel:
258 if self._fut_waiter.cancel():
259 self._must_cancel = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700260 else:
Yury Selivanov6370f342017-12-10 18:36:12 -0500261 new_exc = RuntimeError(
262 f'yield was used instead of yield from '
263 f'in task {self!r} with {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500264 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500265 self.__step, new_exc, context=self._context)
Yury Selivanov6370f342017-12-10 18:36:12 -0500266
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700267 elif result is None:
268 # Bare yield relinquishes control for one event loop iteration.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500269 self._loop.call_soon(self.__step, context=self._context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700270 elif inspect.isgenerator(result):
271 # Yielding a generator is just wrong.
Yury Selivanov6370f342017-12-10 18:36:12 -0500272 new_exc = RuntimeError(
273 f'yield was used instead of yield from for '
Miss Islington (bot)52d17412018-05-20 07:34:28 -0700274 f'generator in task {self!r} with {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500275 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500276 self.__step, new_exc, context=self._context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700277 else:
278 # Yielding something else is an error.
Yury Selivanov6370f342017-12-10 18:36:12 -0500279 new_exc = RuntimeError(f'Task got bad yield: {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500280 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500281 self.__step, new_exc, context=self._context)
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800282 finally:
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200283 _leave_task(self._loop, self)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100284 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700285
Yury Selivanov22feeb82018-01-24 11:31:01 -0500286 def __wakeup(self, future):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700287 try:
Yury Selivanova4afc482015-11-16 15:12:10 -0500288 future.result()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700289 except Exception as exc:
290 # This may also be a cancellation.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500291 self.__step(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700292 else:
Yury Selivanova4afc482015-11-16 15:12:10 -0500293 # Don't pass the value of `future.result()` explicitly,
294 # as `Future.__iter__` and `Future.__await__` don't need it.
295 # If we call `_step(value, None)` instead of `_step()`,
296 # Python eval loop would use `.send(value)` method call,
297 # instead of `__next__()`, which is slower for futures
298 # that return non-generator iterators from their `__iter__`.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500299 self.__step()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700300 self = None # Needed to break cycles when an exception occurs.
301
302
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400303_PyTask = Task
304
305
306try:
307 import _asyncio
308except ImportError:
309 pass
310else:
311 # _CTask is needed for tests.
312 Task = _CTask = _asyncio.Task
313
314
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200315def create_task(coro):
316 """Schedule the execution of a coroutine object in a spawn task.
317
318 Return a Task object.
319 """
320 loop = events.get_running_loop()
321 return loop.create_task(coro)
322
323
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700324# wait() and as_completed() similar to those in PEP 3148.
325
326FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
327FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
328ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
329
330
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200331async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700332 """Wait for the Futures and coroutines given by fs to complete.
333
Victor Stinnerdb74d982014-06-10 11:16:05 +0200334 The sequence futures must not be empty.
335
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700336 Coroutines will be wrapped in Tasks.
337
338 Returns two sets of Future: (done, pending).
339
340 Usage:
341
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200342 done, pending = await asyncio.wait(fs)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700343
344 Note: This does not raise TimeoutError! Futures that aren't done
345 when the timeout occurs are returned in the second set.
346 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700347 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500348 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700349 if not fs:
350 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200351 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
Yury Selivanov6370f342017-12-10 18:36:12 -0500352 raise ValueError(f'Invalid return_when value: {return_when}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700353
354 if loop is None:
355 loop = events.get_event_loop()
356
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400357 fs = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700358
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200359 return await _wait(fs, timeout, return_when, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700360
361
Victor Stinner59e08022014-08-28 11:19:25 +0200362def _release_waiter(waiter, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700363 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200364 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700365
366
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200367async def wait_for(fut, timeout, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700368 """Wait for the single Future or coroutine to complete, with timeout.
369
370 Coroutine will be wrapped in Task.
371
Victor Stinner421e49b2014-01-23 17:40:59 +0100372 Returns result of the Future or coroutine. When a timeout occurs,
373 it cancels the task and raises TimeoutError. To avoid the task
374 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700375
Victor Stinner922bc2c2015-01-15 16:29:10 +0100376 If the wait is cancelled, the task is also cancelled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700377
Victor Stinner922bc2c2015-01-15 16:29:10 +0100378 This function is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700379 """
380 if loop is None:
381 loop = events.get_event_loop()
382
Guido van Rossum48c66c32014-01-29 14:30:38 -0800383 if timeout is None:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200384 return await fut
Guido van Rossum48c66c32014-01-29 14:30:38 -0800385
Victor K4d071892017-10-05 19:04:39 +0300386 if timeout <= 0:
387 fut = ensure_future(fut, loop=loop)
388
389 if fut.done():
390 return fut.result()
391
392 fut.cancel()
393 raise futures.TimeoutError()
394
Yury Selivanov7661db62016-05-16 15:38:39 -0400395 waiter = loop.create_future()
Victor Stinner59e08022014-08-28 11:19:25 +0200396 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
397 cb = functools.partial(_release_waiter, waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700398
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400399 fut = ensure_future(fut, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700400 fut.add_done_callback(cb)
401
402 try:
Victor Stinner59e08022014-08-28 11:19:25 +0200403 # wait until the future completes or the timeout
Victor Stinner922bc2c2015-01-15 16:29:10 +0100404 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200405 await waiter
Victor Stinner922bc2c2015-01-15 16:29:10 +0100406 except futures.CancelledError:
407 fut.remove_done_callback(cb)
408 fut.cancel()
409 raise
Victor Stinner59e08022014-08-28 11:19:25 +0200410
411 if fut.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700412 return fut.result()
413 else:
414 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100415 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700416 raise futures.TimeoutError()
417 finally:
418 timeout_handle.cancel()
419
420
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200421async def _wait(fs, timeout, return_when, loop):
Victor Stinner2ba8ece2016-04-01 21:39:09 +0200422 """Internal helper for wait() and wait_for().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700423
424 The fs argument must be a collection of Futures.
425 """
426 assert fs, 'Set of Futures is empty.'
Yury Selivanov7661db62016-05-16 15:38:39 -0400427 waiter = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700428 timeout_handle = None
429 if timeout is not None:
430 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
431 counter = len(fs)
432
433 def _on_completion(f):
434 nonlocal counter
435 counter -= 1
436 if (counter <= 0 or
437 return_when == FIRST_COMPLETED or
438 return_when == FIRST_EXCEPTION and (not f.cancelled() and
439 f.exception() is not None)):
440 if timeout_handle is not None:
441 timeout_handle.cancel()
442 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200443 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700444
445 for f in fs:
446 f.add_done_callback(_on_completion)
447
448 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200449 await waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700450 finally:
451 if timeout_handle is not None:
452 timeout_handle.cancel()
453
454 done, pending = set(), set()
455 for f in fs:
456 f.remove_done_callback(_on_completion)
457 if f.done():
458 done.add(f)
459 else:
460 pending.add(f)
461 return done, pending
462
463
464# This is *not* a @coroutine! It is just an iterator (yielding Futures).
465def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800466 """Return an iterator whose values are coroutines.
467
468 When waiting for the yielded coroutines you'll get the results (or
469 exceptions!) of the original Futures (or coroutines), in the order
470 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700471
472 This differs from PEP 3148; the proper way to use this is:
473
474 for f in as_completed(fs):
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200475 result = await f # The 'await' may raise.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700476 # Use result.
477
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200478 If a timeout is specified, the 'await' will raise
Guido van Rossumb58f0532014-02-12 17:58:19 -0800479 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700480
481 Note: The futures 'f' are not necessarily members of fs.
482 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700483 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500484 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700485 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400486 todo = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800487 from .queues import Queue # Import here to avoid circular import problem.
488 done = Queue(loop=loop)
489 timeout_handle = None
490
491 def _on_timeout():
492 for f in todo:
493 f.remove_done_callback(_on_completion)
494 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
495 todo.clear() # Can't do todo.remove(f) in the loop.
496
497 def _on_completion(f):
498 if not todo:
499 return # _on_timeout() was here first.
500 todo.remove(f)
501 done.put_nowait(f)
502 if not todo and timeout_handle is not None:
503 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700504
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200505 async def _wait_for_one():
506 f = await done.get()
Guido van Rossumb58f0532014-02-12 17:58:19 -0800507 if f is None:
508 # Dummy value from _on_timeout().
509 raise futures.TimeoutError
510 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700511
Guido van Rossumb58f0532014-02-12 17:58:19 -0800512 for f in todo:
513 f.add_done_callback(_on_completion)
514 if todo and timeout is not None:
515 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700516 for _ in range(len(todo)):
517 yield _wait_for_one()
518
519
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200520@types.coroutine
521def __sleep0():
522 """Skip one event loop run cycle.
523
524 This is a private helper for 'asyncio.sleep()', used
525 when the 'delay' is set to 0. It uses a bare 'yield'
Yury Selivanov22feeb82018-01-24 11:31:01 -0500526 expression (which Task.__step knows how to handle)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200527 instead of creating a Future object.
528 """
529 yield
530
531
532async def sleep(delay, result=None, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700533 """Coroutine that completes after a given time (in seconds)."""
Andrew Svetlov5382c052017-12-17 16:41:30 +0200534 if delay <= 0:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200535 await __sleep0()
Yury Selivanovade04122015-11-05 14:29:04 -0500536 return result
537
Yury Selivanov7661db62016-05-16 15:38:39 -0400538 if loop is None:
539 loop = events.get_event_loop()
540 future = loop.create_future()
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500541 h = loop.call_later(delay,
542 futures._set_result_unless_cancelled,
543 future, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700544 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200545 return await future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700546 finally:
547 h.cancel()
548
549
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400550def ensure_future(coro_or_future, *, loop=None):
Yury Selivanov620279b2015-10-02 15:00:19 -0400551 """Wrap a coroutine or an awaitable in a future.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400552
553 If the argument is a Future, it is returned directly.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700554 """
Miss Islington (bot)f8fdb362018-05-28 11:42:50 -0700555 if coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200556 if loop is None:
557 loop = events.get_event_loop()
558 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200559 if task._source_traceback:
560 del task._source_traceback[-1]
561 return task
Miss Islington (bot)f8fdb362018-05-28 11:42:50 -0700562 elif futures.isfuture(coro_or_future):
563 if loop is not None and loop is not futures._get_loop(coro_or_future):
564 raise ValueError('loop argument must agree with Future')
565 return coro_or_future
Victor Stinner3f438a92017-11-28 14:43:52 +0100566 elif inspect.isawaitable(coro_or_future):
Yury Selivanov620279b2015-10-02 15:00:19 -0400567 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700568 else:
Charles Renwickae5b3262017-04-21 16:49:48 -0400569 raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
570 'required')
Yury Selivanov620279b2015-10-02 15:00:19 -0400571
572
573@coroutine
574def _wrap_awaitable(awaitable):
575 """Helper for asyncio.ensure_future().
576
577 Wraps awaitable (an object with __await__) into a coroutine
578 that will later be wrapped in a Task by ensure_future().
579 """
580 return (yield from awaitable.__await__())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700581
582
583class _GatheringFuture(futures.Future):
584 """Helper for gather().
585
586 This overrides cancel() to cancel all the children and act more
587 like Task.cancel(), which doesn't immediately mark itself as
588 cancelled.
589 """
590
591 def __init__(self, children, *, loop=None):
592 super().__init__(loop=loop)
593 self._children = children
594
595 def cancel(self):
596 if self.done():
597 return False
Yury Selivanov3d676152016-10-21 17:22:17 -0400598 ret = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700599 for child in self._children:
Yury Selivanov3d676152016-10-21 17:22:17 -0400600 if child.cancel():
601 ret = True
602 return ret
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700603
604
605def gather(*coros_or_futures, loop=None, return_exceptions=False):
Yury Selivanov36c2c042017-12-19 07:19:53 -0500606 """Return a future aggregating results from the given coroutines/futures.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700607
Guido van Rossume3c65a72016-09-30 08:17:15 -0700608 Coroutines will be wrapped in a future and scheduled in the event
609 loop. They will not necessarily be scheduled in the same order as
610 passed in.
611
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700612 All futures must share the same event loop. If all the tasks are
613 done successfully, the returned future's result is the list of
614 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500615 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700616 exceptions in the tasks are treated the same as successful
617 results, and gathered in the result list; otherwise, the first
618 raised exception will be immediately propagated to the returned
619 future.
620
621 Cancellation: if the outer Future is cancelled, all children (that
622 have not completed yet) are also cancelled. If any child is
623 cancelled, this is treated as if it raised CancelledError --
624 the outer Future is *not* cancelled in this case. (This is to
625 prevent the cancellation of one child to cause other children to
626 be cancelled.)
627 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200628 if not coros_or_futures:
Yury Selivanov7661db62016-05-16 15:38:39 -0400629 if loop is None:
630 loop = events.get_event_loop()
631 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700632 outer.set_result([])
633 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200634
Yury Selivanov36c2c042017-12-19 07:19:53 -0500635 def _done_callback(fut):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700636 nonlocal nfinished
Yury Selivanov36c2c042017-12-19 07:19:53 -0500637 nfinished += 1
638
Victor Stinner3531d902015-01-09 01:42:52 +0100639 if outer.done():
640 if not fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700641 # Mark exception retrieved.
642 fut.exception()
643 return
Victor Stinner3531d902015-01-09 01:42:52 +0100644
Yury Selivanov36c2c042017-12-19 07:19:53 -0500645 if not return_exceptions:
646 if fut.cancelled():
647 # Check if 'fut' is cancelled first, as
648 # 'fut.exception()' will *raise* a CancelledError
649 # instead of returning it.
650 exc = futures.CancelledError()
651 outer.set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700652 return
Yury Selivanov36c2c042017-12-19 07:19:53 -0500653 else:
654 exc = fut.exception()
655 if exc is not None:
656 outer.set_exception(exc)
657 return
658
659 if nfinished == nfuts:
660 # All futures are done; create a list of results
661 # and set it to the 'outer' future.
662 results = []
663
664 for fut in children:
665 if fut.cancelled():
666 # Check if 'fut' is cancelled first, as
667 # 'fut.exception()' will *raise* a CancelledError
668 # instead of returning it.
669 res = futures.CancelledError()
670 else:
671 res = fut.exception()
672 if res is None:
673 res = fut.result()
674 results.append(res)
675
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700676 outer.set_result(results)
677
Yury Selivanov36c2c042017-12-19 07:19:53 -0500678 arg_to_fut = {}
679 children = []
680 nfuts = 0
681 nfinished = 0
682 for arg in coros_or_futures:
683 if arg not in arg_to_fut:
684 fut = ensure_future(arg, loop=loop)
685 if loop is None:
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500686 loop = futures._get_loop(fut)
Yury Selivanov36c2c042017-12-19 07:19:53 -0500687 if fut is not arg:
688 # 'arg' was not a Future, therefore, 'fut' is a new
689 # Future created specifically for 'arg'. Since the caller
690 # can't control it, disable the "destroy pending task"
691 # warning.
692 fut._log_destroy_pending = False
693
694 nfuts += 1
695 arg_to_fut[arg] = fut
696 fut.add_done_callback(_done_callback)
697
698 else:
699 # There's a duplicate Future object in coros_or_futures.
700 fut = arg_to_fut[arg]
701
702 children.append(fut)
703
704 outer = _GatheringFuture(children, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700705 return outer
706
707
708def shield(arg, *, loop=None):
709 """Wait for a future, shielding it from cancellation.
710
711 The statement
712
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200713 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700714
715 is exactly equivalent to the statement
716
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200717 res = await something()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700718
719 *except* that if the coroutine containing it is cancelled, the
720 task running in something() is not cancelled. From the POV of
721 something(), the cancellation did not happen. But its caller is
722 still cancelled, so the yield-from expression still raises
723 CancelledError. Note: If something() is cancelled by other means
724 this will still cancel shield().
725
726 If you want to completely ignore cancellation (not recommended)
727 you can combine shield() with a try/except clause, as follows:
728
729 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200730 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700731 except CancelledError:
732 res = None
733 """
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400734 inner = ensure_future(arg, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700735 if inner.done():
736 # Shortcut.
737 return inner
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500738 loop = futures._get_loop(inner)
Yury Selivanov7661db62016-05-16 15:38:39 -0400739 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700740
741 def _done_callback(inner):
742 if outer.cancelled():
Victor Stinner3531d902015-01-09 01:42:52 +0100743 if not inner.cancelled():
744 # Mark inner's result as retrieved.
745 inner.exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700746 return
Victor Stinner3531d902015-01-09 01:42:52 +0100747
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700748 if inner.cancelled():
749 outer.cancel()
750 else:
751 exc = inner.exception()
752 if exc is not None:
753 outer.set_exception(exc)
754 else:
755 outer.set_result(inner.result())
756
757 inner.add_done_callback(_done_callback)
758 return outer
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700759
760
761def run_coroutine_threadsafe(coro, loop):
762 """Submit a coroutine object to a given event loop.
763
764 Return a concurrent.futures.Future to access the result.
765 """
766 if not coroutines.iscoroutine(coro):
767 raise TypeError('A coroutine object is required')
768 future = concurrent.futures.Future()
769
770 def callback():
Guido van Rossum601953b2015-10-05 16:20:00 -0700771 try:
772 futures._chain_future(ensure_future(coro, loop=loop), future)
773 except Exception as exc:
774 if future.set_running_or_notify_cancel():
775 future.set_exception(exc)
776 raise
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700777
778 loop.call_soon_threadsafe(callback)
779 return future
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200780
781
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500782# WeakSet containing all alive tasks.
783_all_tasks = weakref.WeakSet()
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200784
785# Dictionary containing tasks that are currently active in
786# all running event loops. {EventLoop: Task}
787_current_tasks = {}
788
789
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500790def _register_task(task):
791 """Register a new task in asyncio as executed by loop."""
792 _all_tasks.add(task)
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200793
794
795def _enter_task(loop, task):
796 current_task = _current_tasks.get(loop)
797 if current_task is not None:
798 raise RuntimeError(f"Cannot enter into task {task!r} while another "
799 f"task {current_task!r} is being executed.")
800 _current_tasks[loop] = task
801
802
803def _leave_task(loop, task):
804 current_task = _current_tasks.get(loop)
805 if current_task is not task:
806 raise RuntimeError(f"Leaving task {task!r} does not match "
807 f"the current task {current_task!r}.")
808 del _current_tasks[loop]
809
810
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500811def _unregister_task(task):
812 """Unregister a task."""
813 _all_tasks.discard(task)
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200814
815
816_py_register_task = _register_task
817_py_unregister_task = _unregister_task
818_py_enter_task = _enter_task
819_py_leave_task = _leave_task
820
821
822try:
823 from _asyncio import (_register_task, _unregister_task,
824 _enter_task, _leave_task,
825 _all_tasks, _current_tasks)
826except ImportError:
827 pass
828else:
829 _c_register_task = _register_task
830 _c_unregister_task = _unregister_task
831 _c_enter_task = _enter_task
832 _c_leave_task = _leave_task