blob: 4a9db2a3a05ceb13b511958b7f760f495dbdbb77 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Yury Selivanov6370f342017-12-10 18:36:12 -05003__all__ = (
Andrew Svetlovf74ef452017-12-15 07:04:38 +02004 'Task', 'create_task',
Yury Selivanov6370f342017-12-10 18:36:12 -05005 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
Yury Selivanov9edad3c2017-12-11 10:03:48 -05006 'wait', 'wait_for', 'as_completed', 'sleep',
Yury Selivanov6370f342017-12-10 18:36:12 -05007 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
Andrew Svetlov44d1a592017-12-16 21:58:38 +02008 'current_task', 'all_tasks',
9 '_register_task', '_unregister_task', '_enter_task', '_leave_task',
Yury Selivanov6370f342017-12-10 18:36:12 -050010)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012import concurrent.futures
Yury Selivanovf23746a2018-01-22 19:11:18 -050013import contextvars
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014import functools
15import inspect
Andrew Svetlov5f841b52017-12-09 00:23:48 +020016import types
Yury Selivanov59eb9a42015-05-11 14:48:38 -040017import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018import weakref
19
Yury Selivanova0c1ba62016-10-28 12:52:37 -040020from . import base_tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020021from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022from . import events
23from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020024from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026
Andrew Svetlov44d1a592017-12-16 21:58:38 +020027def current_task(loop=None):
28 """Return a currently executed task."""
29 if loop is None:
30 loop = events.get_running_loop()
31 return _current_tasks.get(loop)
32
33
34def all_tasks(loop=None):
35 """Return a set of all tasks for the loop."""
36 if loop is None:
37 loop = events.get_event_loop()
Yury Selivanovca9b36c2017-12-23 15:04:15 -050038 return {t for t in _all_tasks if futures._get_loop(t) is loop}
Andrew Svetlov44d1a592017-12-16 21:58:38 +020039
40
Yury Selivanov0cf16f92017-12-25 10:48:15 -050041class Task(futures._PyFuture): # Inherit Python Task implementation
42 # from a Python Future implementation.
43
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070044 """A coroutine wrapped in a Future."""
45
46 # An important invariant maintained while a Task not done:
47 #
48 # - Either _fut_waiter is None, and _step() is scheduled;
49 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
50 #
51 # The only transition from the latter to the former is through
52 # _wakeup(). When _fut_waiter is not None, one of its callbacks
53 # must be _wakeup().
54
Victor Stinnerfe22e092014-12-04 23:00:13 +010055 # If False, don't log a message if the task is destroyed whereas its
56 # status is still pending
57 _log_destroy_pending = True
58
Guido van Rossum1a605ed2013-12-06 12:57:40 -080059 @classmethod
60 def current_task(cls, loop=None):
61 """Return the currently running task in an event loop or None.
62
63 By default the current task for the current event loop is returned.
64
65 None is returned when called not in the context of a Task.
66 """
Andrew Svetlov44d1a592017-12-16 21:58:38 +020067 warnings.warn("Task.current_task() is deprecated, "
68 "use asyncio.current_task() instead",
69 PendingDeprecationWarning,
70 stacklevel=2)
Guido van Rossum1a605ed2013-12-06 12:57:40 -080071 if loop is None:
72 loop = events.get_event_loop()
Andrew Svetlov44d1a592017-12-16 21:58:38 +020073 return current_task(loop)
Guido van Rossum1a605ed2013-12-06 12:57:40 -080074
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070075 @classmethod
76 def all_tasks(cls, loop=None):
77 """Return a set of all tasks for an event loop.
78
79 By default all tasks for the current event loop are returned.
80 """
Andrew Svetlov44d1a592017-12-16 21:58:38 +020081 warnings.warn("Task.all_tasks() is deprecated, "
82 "use asyncio.all_tasks() instead",
83 PendingDeprecationWarning,
84 stacklevel=2)
85 return all_tasks(loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086
87 def __init__(self, coro, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070088 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +020089 if self._source_traceback:
90 del self._source_traceback[-1]
Andrew Svetlovf74ef452017-12-15 07:04:38 +020091 if not coroutines.iscoroutine(coro):
92 # raise after Future.__init__(), attrs are required for __del__
93 # prevent logging for pending task in __del__
94 self._log_destroy_pending = False
95 raise TypeError(f"a coroutine was expected, got {coro!r}")
96
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070097 self._must_cancel = False
Andrew Svetlovf74ef452017-12-15 07:04:38 +020098 self._fut_waiter = None
99 self._coro = coro
Yury Selivanovf23746a2018-01-22 19:11:18 -0500100 self._context = contextvars.copy_context()
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200101
Yury Selivanov22feeb82018-01-24 11:31:01 -0500102 self._loop.call_soon(self.__step, context=self._context)
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500103 _register_task(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700104
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900105 def __del__(self):
106 if self._state == futures._PENDING and self._log_destroy_pending:
107 context = {
108 'task': self,
109 'message': 'Task was destroyed but it is pending!',
110 }
111 if self._source_traceback:
112 context['source_traceback'] = self._source_traceback
113 self._loop.call_exception_handler(context)
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500114 super().__del__()
Victor Stinnera02f81f2014-06-24 22:37:53 +0200115
Victor Stinner313a9802014-07-29 12:58:23 +0200116 def _repr_info(self):
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400117 return base_tasks._task_repr_info(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700118
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500119 def set_result(self, result):
120 raise RuntimeError('Task does not support set_result operation')
121
122 def set_exception(self, exception):
123 raise RuntimeError('Task does not support set_exception operation')
124
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700125 def get_stack(self, *, limit=None):
126 """Return the list of stack frames for this task's coroutine.
127
Victor Stinnerd87de832014-12-02 17:57:04 +0100128 If the coroutine is not done, this returns the stack where it is
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700129 suspended. If the coroutine has completed successfully or was
130 cancelled, this returns an empty list. If the coroutine was
131 terminated by an exception, this returns the list of traceback
132 frames.
133
134 The frames are always ordered from oldest to newest.
135
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500136 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700137 return; by default all available frames are returned. Its
138 meaning differs depending on whether a stack or a traceback is
139 returned: the newest frames of a stack are returned, but the
140 oldest frames of a traceback are returned. (This matches the
141 behavior of the traceback module.)
142
143 For reasons beyond our control, only one stack frame is
144 returned for a suspended coroutine.
145 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400146 return base_tasks._task_get_stack(self, limit)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700147
148 def print_stack(self, *, limit=None, file=None):
149 """Print the stack or traceback for this task's coroutine.
150
151 This produces output similar to that of the traceback module,
152 for the frames retrieved by get_stack(). The limit argument
153 is passed to get_stack(). The file argument is an I/O stream
R David Murray8e069d52014-09-24 13:13:45 -0400154 to which the output is written; by default output is written
155 to sys.stderr.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700156 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400157 return base_tasks._task_print_stack(self, limit, file)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700158
159 def cancel(self):
R David Murray8e069d52014-09-24 13:13:45 -0400160 """Request that this task cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200161
Victor Stinner8d213572014-06-02 23:06:46 +0200162 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200163 wrapped coroutine on the next cycle through the event loop.
164 The coroutine then has a chance to clean up or even deny
165 the request using try/except/finally.
166
R David Murray8e069d52014-09-24 13:13:45 -0400167 Unlike Future.cancel, this does not guarantee that the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200168 task will be cancelled: the exception might be caught and
R David Murray8e069d52014-09-24 13:13:45 -0400169 acted upon, delaying cancellation of the task or preventing
170 cancellation completely. The task may also return a value or
171 raise a different exception.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200172
173 Immediately after this method is called, Task.cancelled() will
174 not return True (unless the task was already cancelled). A
175 task will be marked as cancelled when the wrapped coroutine
176 terminates with a CancelledError exception (even if cancel()
177 was not called).
178 """
Yury Selivanov7ce1c6f2017-06-11 13:49:18 +0000179 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700180 if self.done():
181 return False
182 if self._fut_waiter is not None:
183 if self._fut_waiter.cancel():
184 # Leave self._fut_waiter; it may be a Task that
185 # catches and ignores the cancellation so we may have
186 # to cancel it again later.
187 return True
Yury Selivanov22feeb82018-01-24 11:31:01 -0500188 # It must be the case that self.__step is already scheduled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700189 self._must_cancel = True
190 return True
191
Yury Selivanov22feeb82018-01-24 11:31:01 -0500192 def __step(self, exc=None):
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500193 if self.done():
194 raise futures.InvalidStateError(
195 f'_step(): already done: {self!r}, {exc!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700196 if self._must_cancel:
197 if not isinstance(exc, futures.CancelledError):
198 exc = futures.CancelledError()
199 self._must_cancel = False
200 coro = self._coro
201 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800202
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200203 _enter_task(self._loop, self)
Yury Selivanovd59bba82015-11-20 12:41:03 -0500204 # Call either coro.throw(exc) or coro.send(None).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700205 try:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500206 if exc is None:
207 # We use the `send` method directly, because coroutines
208 # don't have `__iter__` and `__next__` methods.
209 result = coro.send(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700210 else:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500211 result = coro.throw(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700212 except StopIteration as exc:
INADA Naoki991adca2017-05-11 21:18:38 +0900213 if self._must_cancel:
214 # Task is cancelled right before coro stops.
215 self._must_cancel = False
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500216 super().set_exception(futures.CancelledError())
INADA Naoki991adca2017-05-11 21:18:38 +0900217 else:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500218 super().set_result(exc.value)
Yury Selivanov4145c832016-10-09 12:19:12 -0400219 except futures.CancelledError:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700220 super().cancel() # I.e., Future.cancel(self).
221 except Exception as exc:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500222 super().set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700223 except BaseException as exc:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500224 super().set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700225 raise
226 else:
Guido van Rossum1140a032016-09-09 12:54:54 -0700227 blocking = getattr(result, '_asyncio_future_blocking', None)
228 if blocking is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700229 # Yielded Future must come from Future.__iter__().
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500230 if futures._get_loop(result) is not self._loop:
Yury Selivanov6370f342017-12-10 18:36:12 -0500231 new_exc = RuntimeError(
232 f'Task {self!r} got Future '
233 f'{result!r} attached to a different loop')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500234 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500235 self.__step, new_exc, context=self._context)
Guido van Rossum1140a032016-09-09 12:54:54 -0700236 elif blocking:
Yury Selivanov4145c832016-10-09 12:19:12 -0400237 if result is self:
Yury Selivanov6370f342017-12-10 18:36:12 -0500238 new_exc = RuntimeError(
239 f'Task cannot await on itself: {self!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500240 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500241 self.__step, new_exc, context=self._context)
Yury Selivanov4145c832016-10-09 12:19:12 -0400242 else:
243 result._asyncio_future_blocking = False
Yury Selivanovf23746a2018-01-22 19:11:18 -0500244 result.add_done_callback(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500245 self.__wakeup, context=self._context)
Yury Selivanov4145c832016-10-09 12:19:12 -0400246 self._fut_waiter = result
247 if self._must_cancel:
248 if self._fut_waiter.cancel():
249 self._must_cancel = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700250 else:
Yury Selivanov6370f342017-12-10 18:36:12 -0500251 new_exc = RuntimeError(
252 f'yield was used instead of yield from '
253 f'in task {self!r} with {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500254 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500255 self.__step, new_exc, context=self._context)
Yury Selivanov6370f342017-12-10 18:36:12 -0500256
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700257 elif result is None:
258 # Bare yield relinquishes control for one event loop iteration.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500259 self._loop.call_soon(self.__step, context=self._context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700260 elif inspect.isgenerator(result):
261 # Yielding a generator is just wrong.
Yury Selivanov6370f342017-12-10 18:36:12 -0500262 new_exc = RuntimeError(
263 f'yield was used instead of yield from for '
Miss Islington (bot)52d17412018-05-20 07:34:28 -0700264 f'generator in task {self!r} with {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500265 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500266 self.__step, new_exc, context=self._context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700267 else:
268 # Yielding something else is an error.
Yury Selivanov6370f342017-12-10 18:36:12 -0500269 new_exc = RuntimeError(f'Task got bad yield: {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500270 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500271 self.__step, new_exc, context=self._context)
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800272 finally:
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200273 _leave_task(self._loop, self)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100274 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700275
Yury Selivanov22feeb82018-01-24 11:31:01 -0500276 def __wakeup(self, future):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700277 try:
Yury Selivanova4afc482015-11-16 15:12:10 -0500278 future.result()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700279 except Exception as exc:
280 # This may also be a cancellation.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500281 self.__step(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700282 else:
Yury Selivanova4afc482015-11-16 15:12:10 -0500283 # Don't pass the value of `future.result()` explicitly,
284 # as `Future.__iter__` and `Future.__await__` don't need it.
285 # If we call `_step(value, None)` instead of `_step()`,
286 # Python eval loop would use `.send(value)` method call,
287 # instead of `__next__()`, which is slower for futures
288 # that return non-generator iterators from their `__iter__`.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500289 self.__step()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700290 self = None # Needed to break cycles when an exception occurs.
291
292
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400293_PyTask = Task
294
295
296try:
297 import _asyncio
298except ImportError:
299 pass
300else:
301 # _CTask is needed for tests.
302 Task = _CTask = _asyncio.Task
303
304
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200305def create_task(coro):
306 """Schedule the execution of a coroutine object in a spawn task.
307
308 Return a Task object.
309 """
310 loop = events.get_running_loop()
311 return loop.create_task(coro)
312
313
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700314# wait() and as_completed() similar to those in PEP 3148.
315
316FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
317FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
318ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
319
320
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200321async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700322 """Wait for the Futures and coroutines given by fs to complete.
323
Victor Stinnerdb74d982014-06-10 11:16:05 +0200324 The sequence futures must not be empty.
325
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700326 Coroutines will be wrapped in Tasks.
327
328 Returns two sets of Future: (done, pending).
329
330 Usage:
331
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200332 done, pending = await asyncio.wait(fs)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700333
334 Note: This does not raise TimeoutError! Futures that aren't done
335 when the timeout occurs are returned in the second set.
336 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700337 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500338 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700339 if not fs:
340 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200341 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
Yury Selivanov6370f342017-12-10 18:36:12 -0500342 raise ValueError(f'Invalid return_when value: {return_when}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700343
344 if loop is None:
345 loop = events.get_event_loop()
346
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400347 fs = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700348
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200349 return await _wait(fs, timeout, return_when, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700350
351
Victor Stinner59e08022014-08-28 11:19:25 +0200352def _release_waiter(waiter, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700353 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200354 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700355
356
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200357async def wait_for(fut, timeout, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700358 """Wait for the single Future or coroutine to complete, with timeout.
359
360 Coroutine will be wrapped in Task.
361
Victor Stinner421e49b2014-01-23 17:40:59 +0100362 Returns result of the Future or coroutine. When a timeout occurs,
363 it cancels the task and raises TimeoutError. To avoid the task
364 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700365
Victor Stinner922bc2c2015-01-15 16:29:10 +0100366 If the wait is cancelled, the task is also cancelled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700367
Victor Stinner922bc2c2015-01-15 16:29:10 +0100368 This function is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700369 """
370 if loop is None:
371 loop = events.get_event_loop()
372
Guido van Rossum48c66c32014-01-29 14:30:38 -0800373 if timeout is None:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200374 return await fut
Guido van Rossum48c66c32014-01-29 14:30:38 -0800375
Victor K4d071892017-10-05 19:04:39 +0300376 if timeout <= 0:
377 fut = ensure_future(fut, loop=loop)
378
379 if fut.done():
380 return fut.result()
381
382 fut.cancel()
383 raise futures.TimeoutError()
384
Yury Selivanov7661db62016-05-16 15:38:39 -0400385 waiter = loop.create_future()
Victor Stinner59e08022014-08-28 11:19:25 +0200386 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
387 cb = functools.partial(_release_waiter, waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700388
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400389 fut = ensure_future(fut, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700390 fut.add_done_callback(cb)
391
392 try:
Victor Stinner59e08022014-08-28 11:19:25 +0200393 # wait until the future completes or the timeout
Victor Stinner922bc2c2015-01-15 16:29:10 +0100394 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200395 await waiter
Victor Stinner922bc2c2015-01-15 16:29:10 +0100396 except futures.CancelledError:
397 fut.remove_done_callback(cb)
398 fut.cancel()
399 raise
Victor Stinner59e08022014-08-28 11:19:25 +0200400
401 if fut.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700402 return fut.result()
403 else:
404 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100405 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700406 raise futures.TimeoutError()
407 finally:
408 timeout_handle.cancel()
409
410
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200411async def _wait(fs, timeout, return_when, loop):
Victor Stinner2ba8ece2016-04-01 21:39:09 +0200412 """Internal helper for wait() and wait_for().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700413
414 The fs argument must be a collection of Futures.
415 """
416 assert fs, 'Set of Futures is empty.'
Yury Selivanov7661db62016-05-16 15:38:39 -0400417 waiter = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700418 timeout_handle = None
419 if timeout is not None:
420 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
421 counter = len(fs)
422
423 def _on_completion(f):
424 nonlocal counter
425 counter -= 1
426 if (counter <= 0 or
427 return_when == FIRST_COMPLETED or
428 return_when == FIRST_EXCEPTION and (not f.cancelled() and
429 f.exception() is not None)):
430 if timeout_handle is not None:
431 timeout_handle.cancel()
432 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200433 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700434
435 for f in fs:
436 f.add_done_callback(_on_completion)
437
438 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200439 await waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700440 finally:
441 if timeout_handle is not None:
442 timeout_handle.cancel()
443
444 done, pending = set(), set()
445 for f in fs:
446 f.remove_done_callback(_on_completion)
447 if f.done():
448 done.add(f)
449 else:
450 pending.add(f)
451 return done, pending
452
453
454# This is *not* a @coroutine! It is just an iterator (yielding Futures).
455def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800456 """Return an iterator whose values are coroutines.
457
458 When waiting for the yielded coroutines you'll get the results (or
459 exceptions!) of the original Futures (or coroutines), in the order
460 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700461
462 This differs from PEP 3148; the proper way to use this is:
463
464 for f in as_completed(fs):
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200465 result = await f # The 'await' may raise.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700466 # Use result.
467
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200468 If a timeout is specified, the 'await' will raise
Guido van Rossumb58f0532014-02-12 17:58:19 -0800469 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700470
471 Note: The futures 'f' are not necessarily members of fs.
472 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700473 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500474 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700475 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400476 todo = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800477 from .queues import Queue # Import here to avoid circular import problem.
478 done = Queue(loop=loop)
479 timeout_handle = None
480
481 def _on_timeout():
482 for f in todo:
483 f.remove_done_callback(_on_completion)
484 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
485 todo.clear() # Can't do todo.remove(f) in the loop.
486
487 def _on_completion(f):
488 if not todo:
489 return # _on_timeout() was here first.
490 todo.remove(f)
491 done.put_nowait(f)
492 if not todo and timeout_handle is not None:
493 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700494
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200495 async def _wait_for_one():
496 f = await done.get()
Guido van Rossumb58f0532014-02-12 17:58:19 -0800497 if f is None:
498 # Dummy value from _on_timeout().
499 raise futures.TimeoutError
500 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700501
Guido van Rossumb58f0532014-02-12 17:58:19 -0800502 for f in todo:
503 f.add_done_callback(_on_completion)
504 if todo and timeout is not None:
505 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700506 for _ in range(len(todo)):
507 yield _wait_for_one()
508
509
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200510@types.coroutine
511def __sleep0():
512 """Skip one event loop run cycle.
513
514 This is a private helper for 'asyncio.sleep()', used
515 when the 'delay' is set to 0. It uses a bare 'yield'
Yury Selivanov22feeb82018-01-24 11:31:01 -0500516 expression (which Task.__step knows how to handle)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200517 instead of creating a Future object.
518 """
519 yield
520
521
522async def sleep(delay, result=None, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700523 """Coroutine that completes after a given time (in seconds)."""
Andrew Svetlov5382c052017-12-17 16:41:30 +0200524 if delay <= 0:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200525 await __sleep0()
Yury Selivanovade04122015-11-05 14:29:04 -0500526 return result
527
Yury Selivanov7661db62016-05-16 15:38:39 -0400528 if loop is None:
529 loop = events.get_event_loop()
530 future = loop.create_future()
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500531 h = loop.call_later(delay,
532 futures._set_result_unless_cancelled,
533 future, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700534 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200535 return await future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700536 finally:
537 h.cancel()
538
539
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400540def ensure_future(coro_or_future, *, loop=None):
Yury Selivanov620279b2015-10-02 15:00:19 -0400541 """Wrap a coroutine or an awaitable in a future.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400542
543 If the argument is a Future, it is returned directly.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700544 """
Miss Islington (bot)f8fdb362018-05-28 11:42:50 -0700545 if coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200546 if loop is None:
547 loop = events.get_event_loop()
548 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200549 if task._source_traceback:
550 del task._source_traceback[-1]
551 return task
Miss Islington (bot)f8fdb362018-05-28 11:42:50 -0700552 elif futures.isfuture(coro_or_future):
553 if loop is not None and loop is not futures._get_loop(coro_or_future):
554 raise ValueError('loop argument must agree with Future')
555 return coro_or_future
Victor Stinner3f438a92017-11-28 14:43:52 +0100556 elif inspect.isawaitable(coro_or_future):
Yury Selivanov620279b2015-10-02 15:00:19 -0400557 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700558 else:
Charles Renwickae5b3262017-04-21 16:49:48 -0400559 raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
560 'required')
Yury Selivanov620279b2015-10-02 15:00:19 -0400561
562
563@coroutine
564def _wrap_awaitable(awaitable):
565 """Helper for asyncio.ensure_future().
566
567 Wraps awaitable (an object with __await__) into a coroutine
568 that will later be wrapped in a Task by ensure_future().
569 """
570 return (yield from awaitable.__await__())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700571
572
573class _GatheringFuture(futures.Future):
574 """Helper for gather().
575
576 This overrides cancel() to cancel all the children and act more
577 like Task.cancel(), which doesn't immediately mark itself as
578 cancelled.
579 """
580
581 def __init__(self, children, *, loop=None):
582 super().__init__(loop=loop)
583 self._children = children
584
585 def cancel(self):
586 if self.done():
587 return False
Yury Selivanov3d676152016-10-21 17:22:17 -0400588 ret = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700589 for child in self._children:
Yury Selivanov3d676152016-10-21 17:22:17 -0400590 if child.cancel():
591 ret = True
592 return ret
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700593
594
595def gather(*coros_or_futures, loop=None, return_exceptions=False):
Yury Selivanov36c2c042017-12-19 07:19:53 -0500596 """Return a future aggregating results from the given coroutines/futures.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700597
Guido van Rossume3c65a72016-09-30 08:17:15 -0700598 Coroutines will be wrapped in a future and scheduled in the event
599 loop. They will not necessarily be scheduled in the same order as
600 passed in.
601
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700602 All futures must share the same event loop. If all the tasks are
603 done successfully, the returned future's result is the list of
604 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500605 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700606 exceptions in the tasks are treated the same as successful
607 results, and gathered in the result list; otherwise, the first
608 raised exception will be immediately propagated to the returned
609 future.
610
611 Cancellation: if the outer Future is cancelled, all children (that
612 have not completed yet) are also cancelled. If any child is
613 cancelled, this is treated as if it raised CancelledError --
614 the outer Future is *not* cancelled in this case. (This is to
615 prevent the cancellation of one child to cause other children to
616 be cancelled.)
617 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200618 if not coros_or_futures:
Yury Selivanov7661db62016-05-16 15:38:39 -0400619 if loop is None:
620 loop = events.get_event_loop()
621 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700622 outer.set_result([])
623 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200624
Yury Selivanov36c2c042017-12-19 07:19:53 -0500625 def _done_callback(fut):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700626 nonlocal nfinished
Yury Selivanov36c2c042017-12-19 07:19:53 -0500627 nfinished += 1
628
Victor Stinner3531d902015-01-09 01:42:52 +0100629 if outer.done():
630 if not fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700631 # Mark exception retrieved.
632 fut.exception()
633 return
Victor Stinner3531d902015-01-09 01:42:52 +0100634
Yury Selivanov36c2c042017-12-19 07:19:53 -0500635 if not return_exceptions:
636 if fut.cancelled():
637 # Check if 'fut' is cancelled first, as
638 # 'fut.exception()' will *raise* a CancelledError
639 # instead of returning it.
640 exc = futures.CancelledError()
641 outer.set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700642 return
Yury Selivanov36c2c042017-12-19 07:19:53 -0500643 else:
644 exc = fut.exception()
645 if exc is not None:
646 outer.set_exception(exc)
647 return
648
649 if nfinished == nfuts:
650 # All futures are done; create a list of results
651 # and set it to the 'outer' future.
652 results = []
653
654 for fut in children:
655 if fut.cancelled():
656 # Check if 'fut' is cancelled first, as
657 # 'fut.exception()' will *raise* a CancelledError
658 # instead of returning it.
659 res = futures.CancelledError()
660 else:
661 res = fut.exception()
662 if res is None:
663 res = fut.result()
664 results.append(res)
665
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700666 outer.set_result(results)
667
Yury Selivanov36c2c042017-12-19 07:19:53 -0500668 arg_to_fut = {}
669 children = []
670 nfuts = 0
671 nfinished = 0
672 for arg in coros_or_futures:
673 if arg not in arg_to_fut:
674 fut = ensure_future(arg, loop=loop)
675 if loop is None:
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500676 loop = futures._get_loop(fut)
Yury Selivanov36c2c042017-12-19 07:19:53 -0500677 if fut is not arg:
678 # 'arg' was not a Future, therefore, 'fut' is a new
679 # Future created specifically for 'arg'. Since the caller
680 # can't control it, disable the "destroy pending task"
681 # warning.
682 fut._log_destroy_pending = False
683
684 nfuts += 1
685 arg_to_fut[arg] = fut
686 fut.add_done_callback(_done_callback)
687
688 else:
689 # There's a duplicate Future object in coros_or_futures.
690 fut = arg_to_fut[arg]
691
692 children.append(fut)
693
694 outer = _GatheringFuture(children, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700695 return outer
696
697
698def shield(arg, *, loop=None):
699 """Wait for a future, shielding it from cancellation.
700
701 The statement
702
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200703 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700704
705 is exactly equivalent to the statement
706
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200707 res = await something()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700708
709 *except* that if the coroutine containing it is cancelled, the
710 task running in something() is not cancelled. From the POV of
711 something(), the cancellation did not happen. But its caller is
712 still cancelled, so the yield-from expression still raises
713 CancelledError. Note: If something() is cancelled by other means
714 this will still cancel shield().
715
716 If you want to completely ignore cancellation (not recommended)
717 you can combine shield() with a try/except clause, as follows:
718
719 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200720 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700721 except CancelledError:
722 res = None
723 """
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400724 inner = ensure_future(arg, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700725 if inner.done():
726 # Shortcut.
727 return inner
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500728 loop = futures._get_loop(inner)
Yury Selivanov7661db62016-05-16 15:38:39 -0400729 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700730
731 def _done_callback(inner):
732 if outer.cancelled():
Victor Stinner3531d902015-01-09 01:42:52 +0100733 if not inner.cancelled():
734 # Mark inner's result as retrieved.
735 inner.exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700736 return
Victor Stinner3531d902015-01-09 01:42:52 +0100737
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700738 if inner.cancelled():
739 outer.cancel()
740 else:
741 exc = inner.exception()
742 if exc is not None:
743 outer.set_exception(exc)
744 else:
745 outer.set_result(inner.result())
746
747 inner.add_done_callback(_done_callback)
748 return outer
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700749
750
751def run_coroutine_threadsafe(coro, loop):
752 """Submit a coroutine object to a given event loop.
753
754 Return a concurrent.futures.Future to access the result.
755 """
756 if not coroutines.iscoroutine(coro):
757 raise TypeError('A coroutine object is required')
758 future = concurrent.futures.Future()
759
760 def callback():
Guido van Rossum601953b2015-10-05 16:20:00 -0700761 try:
762 futures._chain_future(ensure_future(coro, loop=loop), future)
763 except Exception as exc:
764 if future.set_running_or_notify_cancel():
765 future.set_exception(exc)
766 raise
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700767
768 loop.call_soon_threadsafe(callback)
769 return future
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200770
771
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500772# WeakSet containing all alive tasks.
773_all_tasks = weakref.WeakSet()
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200774
775# Dictionary containing tasks that are currently active in
776# all running event loops. {EventLoop: Task}
777_current_tasks = {}
778
779
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500780def _register_task(task):
781 """Register a new task in asyncio as executed by loop."""
782 _all_tasks.add(task)
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200783
784
785def _enter_task(loop, task):
786 current_task = _current_tasks.get(loop)
787 if current_task is not None:
788 raise RuntimeError(f"Cannot enter into task {task!r} while another "
789 f"task {current_task!r} is being executed.")
790 _current_tasks[loop] = task
791
792
793def _leave_task(loop, task):
794 current_task = _current_tasks.get(loop)
795 if current_task is not task:
796 raise RuntimeError(f"Leaving task {task!r} does not match "
797 f"the current task {current_task!r}.")
798 del _current_tasks[loop]
799
800
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500801def _unregister_task(task):
802 """Unregister a task."""
803 _all_tasks.discard(task)
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200804
805
806_py_register_task = _register_task
807_py_unregister_task = _unregister_task
808_py_enter_task = _enter_task
809_py_leave_task = _leave_task
810
811
812try:
813 from _asyncio import (_register_task, _unregister_task,
814 _enter_task, _leave_task,
815 _all_tasks, _current_tasks)
816except ImportError:
817 pass
818else:
819 _c_register_task = _register_task
820 _c_unregister_task = _unregister_task
821 _c_enter_task = _enter_task
822 _c_leave_task = _leave_task