blob: 6cef33d5212ee25acdaafeb6a9e6aff19deec856 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Yury Selivanov6370f342017-12-10 18:36:12 -05003__all__ = (
Andrew Svetlovf74ef452017-12-15 07:04:38 +02004 'Task', 'create_task',
Yury Selivanov6370f342017-12-10 18:36:12 -05005 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
Yury Selivanov9edad3c2017-12-11 10:03:48 -05006 'wait', 'wait_for', 'as_completed', 'sleep',
Yury Selivanov6370f342017-12-10 18:36:12 -05007 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
Andrew Svetlov44d1a592017-12-16 21:58:38 +02008 'current_task', 'all_tasks',
9 '_register_task', '_unregister_task', '_enter_task', '_leave_task',
Yury Selivanov6370f342017-12-10 18:36:12 -050010)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012import concurrent.futures
Yury Selivanovf23746a2018-01-22 19:11:18 -050013import contextvars
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014import functools
15import inspect
Andrew Svetlov5f841b52017-12-09 00:23:48 +020016import types
Yury Selivanov59eb9a42015-05-11 14:48:38 -040017import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018import weakref
19
Yury Selivanova0c1ba62016-10-28 12:52:37 -040020from . import base_tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020021from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022from . import events
23from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020024from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026
Andrew Svetlov44d1a592017-12-16 21:58:38 +020027def current_task(loop=None):
28 """Return a currently executed task."""
29 if loop is None:
30 loop = events.get_running_loop()
31 return _current_tasks.get(loop)
32
33
34def all_tasks(loop=None):
35 """Return a set of all tasks for the loop."""
36 if loop is None:
Miss Islington (bot)ddc613f2018-05-28 17:16:43 -070037 loop = events.get_running_loop()
38 return {t for t in _all_tasks
39 if futures._get_loop(t) is loop and not t.done()}
40
41
42def _all_tasks_compat(loop=None):
43 # Different from "all_task()" by returning *all* Tasks, including
44 # the completed ones. Used to implement deprecated "Tasks.all_task()"
45 # method.
46 if loop is None:
Andrew Svetlov44d1a592017-12-16 21:58:38 +020047 loop = events.get_event_loop()
Yury Selivanovca9b36c2017-12-23 15:04:15 -050048 return {t for t in _all_tasks if futures._get_loop(t) is loop}
Andrew Svetlov44d1a592017-12-16 21:58:38 +020049
50
Yury Selivanov0cf16f92017-12-25 10:48:15 -050051class Task(futures._PyFuture): # Inherit Python Task implementation
52 # from a Python Future implementation.
53
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070054 """A coroutine wrapped in a Future."""
55
56 # An important invariant maintained while a Task not done:
57 #
58 # - Either _fut_waiter is None, and _step() is scheduled;
59 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
60 #
61 # The only transition from the latter to the former is through
62 # _wakeup(). When _fut_waiter is not None, one of its callbacks
63 # must be _wakeup().
64
Victor Stinnerfe22e092014-12-04 23:00:13 +010065 # If False, don't log a message if the task is destroyed whereas its
66 # status is still pending
67 _log_destroy_pending = True
68
Guido van Rossum1a605ed2013-12-06 12:57:40 -080069 @classmethod
70 def current_task(cls, loop=None):
71 """Return the currently running task in an event loop or None.
72
73 By default the current task for the current event loop is returned.
74
75 None is returned when called not in the context of a Task.
76 """
Andrew Svetlov44d1a592017-12-16 21:58:38 +020077 warnings.warn("Task.current_task() is deprecated, "
78 "use asyncio.current_task() instead",
79 PendingDeprecationWarning,
80 stacklevel=2)
Guido van Rossum1a605ed2013-12-06 12:57:40 -080081 if loop is None:
82 loop = events.get_event_loop()
Andrew Svetlov44d1a592017-12-16 21:58:38 +020083 return current_task(loop)
Guido van Rossum1a605ed2013-12-06 12:57:40 -080084
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070085 @classmethod
86 def all_tasks(cls, loop=None):
87 """Return a set of all tasks for an event loop.
88
89 By default all tasks for the current event loop are returned.
90 """
Andrew Svetlov44d1a592017-12-16 21:58:38 +020091 warnings.warn("Task.all_tasks() is deprecated, "
92 "use asyncio.all_tasks() instead",
93 PendingDeprecationWarning,
94 stacklevel=2)
Miss Islington (bot)ddc613f2018-05-28 17:16:43 -070095 return _all_tasks_compat(loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070096
97 def __init__(self, coro, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070098 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +020099 if self._source_traceback:
100 del self._source_traceback[-1]
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200101 if not coroutines.iscoroutine(coro):
102 # raise after Future.__init__(), attrs are required for __del__
103 # prevent logging for pending task in __del__
104 self._log_destroy_pending = False
105 raise TypeError(f"a coroutine was expected, got {coro!r}")
106
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700107 self._must_cancel = False
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200108 self._fut_waiter = None
109 self._coro = coro
Yury Selivanovf23746a2018-01-22 19:11:18 -0500110 self._context = contextvars.copy_context()
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200111
Yury Selivanov22feeb82018-01-24 11:31:01 -0500112 self._loop.call_soon(self.__step, context=self._context)
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500113 _register_task(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700114
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900115 def __del__(self):
116 if self._state == futures._PENDING and self._log_destroy_pending:
117 context = {
118 'task': self,
119 'message': 'Task was destroyed but it is pending!',
120 }
121 if self._source_traceback:
122 context['source_traceback'] = self._source_traceback
123 self._loop.call_exception_handler(context)
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500124 super().__del__()
Victor Stinnera02f81f2014-06-24 22:37:53 +0200125
Victor Stinner313a9802014-07-29 12:58:23 +0200126 def _repr_info(self):
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400127 return base_tasks._task_repr_info(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700128
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500129 def set_result(self, result):
130 raise RuntimeError('Task does not support set_result operation')
131
132 def set_exception(self, exception):
133 raise RuntimeError('Task does not support set_exception operation')
134
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700135 def get_stack(self, *, limit=None):
136 """Return the list of stack frames for this task's coroutine.
137
Victor Stinnerd87de832014-12-02 17:57:04 +0100138 If the coroutine is not done, this returns the stack where it is
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700139 suspended. If the coroutine has completed successfully or was
140 cancelled, this returns an empty list. If the coroutine was
141 terminated by an exception, this returns the list of traceback
142 frames.
143
144 The frames are always ordered from oldest to newest.
145
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500146 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700147 return; by default all available frames are returned. Its
148 meaning differs depending on whether a stack or a traceback is
149 returned: the newest frames of a stack are returned, but the
150 oldest frames of a traceback are returned. (This matches the
151 behavior of the traceback module.)
152
153 For reasons beyond our control, only one stack frame is
154 returned for a suspended coroutine.
155 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400156 return base_tasks._task_get_stack(self, limit)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700157
158 def print_stack(self, *, limit=None, file=None):
159 """Print the stack or traceback for this task's coroutine.
160
161 This produces output similar to that of the traceback module,
162 for the frames retrieved by get_stack(). The limit argument
163 is passed to get_stack(). The file argument is an I/O stream
R David Murray8e069d52014-09-24 13:13:45 -0400164 to which the output is written; by default output is written
165 to sys.stderr.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700166 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400167 return base_tasks._task_print_stack(self, limit, file)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700168
169 def cancel(self):
R David Murray8e069d52014-09-24 13:13:45 -0400170 """Request that this task cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200171
Victor Stinner8d213572014-06-02 23:06:46 +0200172 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200173 wrapped coroutine on the next cycle through the event loop.
174 The coroutine then has a chance to clean up or even deny
175 the request using try/except/finally.
176
R David Murray8e069d52014-09-24 13:13:45 -0400177 Unlike Future.cancel, this does not guarantee that the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200178 task will be cancelled: the exception might be caught and
R David Murray8e069d52014-09-24 13:13:45 -0400179 acted upon, delaying cancellation of the task or preventing
180 cancellation completely. The task may also return a value or
181 raise a different exception.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200182
183 Immediately after this method is called, Task.cancelled() will
184 not return True (unless the task was already cancelled). A
185 task will be marked as cancelled when the wrapped coroutine
186 terminates with a CancelledError exception (even if cancel()
187 was not called).
188 """
Yury Selivanov7ce1c6f2017-06-11 13:49:18 +0000189 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700190 if self.done():
191 return False
192 if self._fut_waiter is not None:
193 if self._fut_waiter.cancel():
194 # Leave self._fut_waiter; it may be a Task that
195 # catches and ignores the cancellation so we may have
196 # to cancel it again later.
197 return True
Yury Selivanov22feeb82018-01-24 11:31:01 -0500198 # It must be the case that self.__step is already scheduled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700199 self._must_cancel = True
200 return True
201
Yury Selivanov22feeb82018-01-24 11:31:01 -0500202 def __step(self, exc=None):
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500203 if self.done():
204 raise futures.InvalidStateError(
205 f'_step(): already done: {self!r}, {exc!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700206 if self._must_cancel:
207 if not isinstance(exc, futures.CancelledError):
208 exc = futures.CancelledError()
209 self._must_cancel = False
210 coro = self._coro
211 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800212
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200213 _enter_task(self._loop, self)
Yury Selivanovd59bba82015-11-20 12:41:03 -0500214 # Call either coro.throw(exc) or coro.send(None).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700215 try:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500216 if exc is None:
217 # We use the `send` method directly, because coroutines
218 # don't have `__iter__` and `__next__` methods.
219 result = coro.send(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700220 else:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500221 result = coro.throw(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700222 except StopIteration as exc:
INADA Naoki991adca2017-05-11 21:18:38 +0900223 if self._must_cancel:
224 # Task is cancelled right before coro stops.
225 self._must_cancel = False
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500226 super().set_exception(futures.CancelledError())
INADA Naoki991adca2017-05-11 21:18:38 +0900227 else:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500228 super().set_result(exc.value)
Yury Selivanov4145c832016-10-09 12:19:12 -0400229 except futures.CancelledError:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700230 super().cancel() # I.e., Future.cancel(self).
231 except Exception as exc:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500232 super().set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700233 except BaseException as exc:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500234 super().set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700235 raise
236 else:
Guido van Rossum1140a032016-09-09 12:54:54 -0700237 blocking = getattr(result, '_asyncio_future_blocking', None)
238 if blocking is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700239 # Yielded Future must come from Future.__iter__().
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500240 if futures._get_loop(result) is not self._loop:
Yury Selivanov6370f342017-12-10 18:36:12 -0500241 new_exc = RuntimeError(
242 f'Task {self!r} got Future '
243 f'{result!r} attached to a different loop')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500244 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500245 self.__step, new_exc, context=self._context)
Guido van Rossum1140a032016-09-09 12:54:54 -0700246 elif blocking:
Yury Selivanov4145c832016-10-09 12:19:12 -0400247 if result is self:
Yury Selivanov6370f342017-12-10 18:36:12 -0500248 new_exc = RuntimeError(
249 f'Task cannot await on itself: {self!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500250 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500251 self.__step, new_exc, context=self._context)
Yury Selivanov4145c832016-10-09 12:19:12 -0400252 else:
253 result._asyncio_future_blocking = False
Yury Selivanovf23746a2018-01-22 19:11:18 -0500254 result.add_done_callback(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500255 self.__wakeup, context=self._context)
Yury Selivanov4145c832016-10-09 12:19:12 -0400256 self._fut_waiter = result
257 if self._must_cancel:
258 if self._fut_waiter.cancel():
259 self._must_cancel = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700260 else:
Yury Selivanov6370f342017-12-10 18:36:12 -0500261 new_exc = RuntimeError(
262 f'yield was used instead of yield from '
263 f'in task {self!r} with {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500264 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500265 self.__step, new_exc, context=self._context)
Yury Selivanov6370f342017-12-10 18:36:12 -0500266
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700267 elif result is None:
268 # Bare yield relinquishes control for one event loop iteration.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500269 self._loop.call_soon(self.__step, context=self._context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700270 elif inspect.isgenerator(result):
271 # Yielding a generator is just wrong.
Yury Selivanov6370f342017-12-10 18:36:12 -0500272 new_exc = RuntimeError(
273 f'yield was used instead of yield from for '
Miss Islington (bot)52d17412018-05-20 07:34:28 -0700274 f'generator in task {self!r} with {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500275 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500276 self.__step, new_exc, context=self._context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700277 else:
278 # Yielding something else is an error.
Yury Selivanov6370f342017-12-10 18:36:12 -0500279 new_exc = RuntimeError(f'Task got bad yield: {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500280 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500281 self.__step, new_exc, context=self._context)
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800282 finally:
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200283 _leave_task(self._loop, self)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100284 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700285
Yury Selivanov22feeb82018-01-24 11:31:01 -0500286 def __wakeup(self, future):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700287 try:
Yury Selivanova4afc482015-11-16 15:12:10 -0500288 future.result()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700289 except Exception as exc:
290 # This may also be a cancellation.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500291 self.__step(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700292 else:
Yury Selivanova4afc482015-11-16 15:12:10 -0500293 # Don't pass the value of `future.result()` explicitly,
294 # as `Future.__iter__` and `Future.__await__` don't need it.
295 # If we call `_step(value, None)` instead of `_step()`,
296 # Python eval loop would use `.send(value)` method call,
297 # instead of `__next__()`, which is slower for futures
298 # that return non-generator iterators from their `__iter__`.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500299 self.__step()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700300 self = None # Needed to break cycles when an exception occurs.
301
302
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400303_PyTask = Task
304
305
306try:
307 import _asyncio
308except ImportError:
309 pass
310else:
311 # _CTask is needed for tests.
312 Task = _CTask = _asyncio.Task
313
314
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200315def create_task(coro):
316 """Schedule the execution of a coroutine object in a spawn task.
317
318 Return a Task object.
319 """
320 loop = events.get_running_loop()
321 return loop.create_task(coro)
322
323
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700324# wait() and as_completed() similar to those in PEP 3148.
325
326FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
327FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
328ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
329
330
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200331async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700332 """Wait for the Futures and coroutines given by fs to complete.
333
Victor Stinnerdb74d982014-06-10 11:16:05 +0200334 The sequence futures must not be empty.
335
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700336 Coroutines will be wrapped in Tasks.
337
338 Returns two sets of Future: (done, pending).
339
340 Usage:
341
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200342 done, pending = await asyncio.wait(fs)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700343
344 Note: This does not raise TimeoutError! Futures that aren't done
345 when the timeout occurs are returned in the second set.
346 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700347 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500348 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700349 if not fs:
350 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200351 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
Yury Selivanov6370f342017-12-10 18:36:12 -0500352 raise ValueError(f'Invalid return_when value: {return_when}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700353
354 if loop is None:
355 loop = events.get_event_loop()
356
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400357 fs = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700358
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200359 return await _wait(fs, timeout, return_when, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700360
361
Victor Stinner59e08022014-08-28 11:19:25 +0200362def _release_waiter(waiter, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700363 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200364 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700365
366
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200367async def wait_for(fut, timeout, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700368 """Wait for the single Future or coroutine to complete, with timeout.
369
370 Coroutine will be wrapped in Task.
371
Victor Stinner421e49b2014-01-23 17:40:59 +0100372 Returns result of the Future or coroutine. When a timeout occurs,
373 it cancels the task and raises TimeoutError. To avoid the task
374 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700375
Victor Stinner922bc2c2015-01-15 16:29:10 +0100376 If the wait is cancelled, the task is also cancelled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700377
Victor Stinner922bc2c2015-01-15 16:29:10 +0100378 This function is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700379 """
380 if loop is None:
381 loop = events.get_event_loop()
382
Guido van Rossum48c66c32014-01-29 14:30:38 -0800383 if timeout is None:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200384 return await fut
Guido van Rossum48c66c32014-01-29 14:30:38 -0800385
Victor K4d071892017-10-05 19:04:39 +0300386 if timeout <= 0:
387 fut = ensure_future(fut, loop=loop)
388
389 if fut.done():
390 return fut.result()
391
392 fut.cancel()
393 raise futures.TimeoutError()
394
Yury Selivanov7661db62016-05-16 15:38:39 -0400395 waiter = loop.create_future()
Victor Stinner59e08022014-08-28 11:19:25 +0200396 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
397 cb = functools.partial(_release_waiter, waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700398
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400399 fut = ensure_future(fut, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700400 fut.add_done_callback(cb)
401
402 try:
Victor Stinner59e08022014-08-28 11:19:25 +0200403 # wait until the future completes or the timeout
Victor Stinner922bc2c2015-01-15 16:29:10 +0100404 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200405 await waiter
Victor Stinner922bc2c2015-01-15 16:29:10 +0100406 except futures.CancelledError:
407 fut.remove_done_callback(cb)
408 fut.cancel()
409 raise
Victor Stinner59e08022014-08-28 11:19:25 +0200410
411 if fut.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700412 return fut.result()
413 else:
414 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100415 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700416 raise futures.TimeoutError()
417 finally:
418 timeout_handle.cancel()
419
420
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200421async def _wait(fs, timeout, return_when, loop):
Victor Stinner2ba8ece2016-04-01 21:39:09 +0200422 """Internal helper for wait() and wait_for().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700423
424 The fs argument must be a collection of Futures.
425 """
426 assert fs, 'Set of Futures is empty.'
Yury Selivanov7661db62016-05-16 15:38:39 -0400427 waiter = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700428 timeout_handle = None
429 if timeout is not None:
430 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
431 counter = len(fs)
432
433 def _on_completion(f):
434 nonlocal counter
435 counter -= 1
436 if (counter <= 0 or
437 return_when == FIRST_COMPLETED or
438 return_when == FIRST_EXCEPTION and (not f.cancelled() and
439 f.exception() is not None)):
440 if timeout_handle is not None:
441 timeout_handle.cancel()
442 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200443 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700444
445 for f in fs:
446 f.add_done_callback(_on_completion)
447
448 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200449 await waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700450 finally:
451 if timeout_handle is not None:
452 timeout_handle.cancel()
453
454 done, pending = set(), set()
455 for f in fs:
456 f.remove_done_callback(_on_completion)
457 if f.done():
458 done.add(f)
459 else:
460 pending.add(f)
461 return done, pending
462
463
464# This is *not* a @coroutine! It is just an iterator (yielding Futures).
465def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800466 """Return an iterator whose values are coroutines.
467
468 When waiting for the yielded coroutines you'll get the results (or
469 exceptions!) of the original Futures (or coroutines), in the order
470 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700471
472 This differs from PEP 3148; the proper way to use this is:
473
474 for f in as_completed(fs):
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200475 result = await f # The 'await' may raise.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700476 # Use result.
477
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200478 If a timeout is specified, the 'await' will raise
Guido van Rossumb58f0532014-02-12 17:58:19 -0800479 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700480
481 Note: The futures 'f' are not necessarily members of fs.
482 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700483 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500484 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700485 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400486 todo = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800487 from .queues import Queue # Import here to avoid circular import problem.
488 done = Queue(loop=loop)
489 timeout_handle = None
490
491 def _on_timeout():
492 for f in todo:
493 f.remove_done_callback(_on_completion)
494 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
495 todo.clear() # Can't do todo.remove(f) in the loop.
496
497 def _on_completion(f):
498 if not todo:
499 return # _on_timeout() was here first.
500 todo.remove(f)
501 done.put_nowait(f)
502 if not todo and timeout_handle is not None:
503 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700504
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200505 async def _wait_for_one():
506 f = await done.get()
Guido van Rossumb58f0532014-02-12 17:58:19 -0800507 if f is None:
508 # Dummy value from _on_timeout().
509 raise futures.TimeoutError
510 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700511
Guido van Rossumb58f0532014-02-12 17:58:19 -0800512 for f in todo:
513 f.add_done_callback(_on_completion)
514 if todo and timeout is not None:
515 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700516 for _ in range(len(todo)):
517 yield _wait_for_one()
518
519
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200520@types.coroutine
521def __sleep0():
522 """Skip one event loop run cycle.
523
524 This is a private helper for 'asyncio.sleep()', used
525 when the 'delay' is set to 0. It uses a bare 'yield'
Yury Selivanov22feeb82018-01-24 11:31:01 -0500526 expression (which Task.__step knows how to handle)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200527 instead of creating a Future object.
528 """
529 yield
530
531
532async def sleep(delay, result=None, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700533 """Coroutine that completes after a given time (in seconds)."""
Andrew Svetlov5382c052017-12-17 16:41:30 +0200534 if delay <= 0:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200535 await __sleep0()
Yury Selivanovade04122015-11-05 14:29:04 -0500536 return result
537
Yury Selivanov7661db62016-05-16 15:38:39 -0400538 if loop is None:
539 loop = events.get_event_loop()
540 future = loop.create_future()
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500541 h = loop.call_later(delay,
542 futures._set_result_unless_cancelled,
543 future, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700544 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200545 return await future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700546 finally:
547 h.cancel()
548
549
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400550def ensure_future(coro_or_future, *, loop=None):
Yury Selivanov620279b2015-10-02 15:00:19 -0400551 """Wrap a coroutine or an awaitable in a future.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400552
553 If the argument is a Future, it is returned directly.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700554 """
Miss Islington (bot)f8fdb362018-05-28 11:42:50 -0700555 if coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200556 if loop is None:
557 loop = events.get_event_loop()
558 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200559 if task._source_traceback:
560 del task._source_traceback[-1]
561 return task
Miss Islington (bot)f8fdb362018-05-28 11:42:50 -0700562 elif futures.isfuture(coro_or_future):
563 if loop is not None and loop is not futures._get_loop(coro_or_future):
564 raise ValueError('loop argument must agree with Future')
565 return coro_or_future
Victor Stinner3f438a92017-11-28 14:43:52 +0100566 elif inspect.isawaitable(coro_or_future):
Yury Selivanov620279b2015-10-02 15:00:19 -0400567 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700568 else:
Charles Renwickae5b3262017-04-21 16:49:48 -0400569 raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
570 'required')
Yury Selivanov620279b2015-10-02 15:00:19 -0400571
572
573@coroutine
574def _wrap_awaitable(awaitable):
575 """Helper for asyncio.ensure_future().
576
577 Wraps awaitable (an object with __await__) into a coroutine
578 that will later be wrapped in a Task by ensure_future().
579 """
580 return (yield from awaitable.__await__())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700581
582
583class _GatheringFuture(futures.Future):
584 """Helper for gather().
585
586 This overrides cancel() to cancel all the children and act more
587 like Task.cancel(), which doesn't immediately mark itself as
588 cancelled.
589 """
590
591 def __init__(self, children, *, loop=None):
592 super().__init__(loop=loop)
593 self._children = children
Miss Islington (bot)03643422018-05-29 15:29:12 -0700594 self._cancel_requested = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700595
596 def cancel(self):
597 if self.done():
598 return False
Yury Selivanov3d676152016-10-21 17:22:17 -0400599 ret = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700600 for child in self._children:
Yury Selivanov3d676152016-10-21 17:22:17 -0400601 if child.cancel():
602 ret = True
Miss Islington (bot)03643422018-05-29 15:29:12 -0700603 if ret:
604 # If any child tasks were actually cancelled, we should
605 # propagate the cancellation request regardless of
606 # *return_exceptions* argument. See issue 32684.
607 self._cancel_requested = True
Yury Selivanov3d676152016-10-21 17:22:17 -0400608 return ret
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700609
610
611def gather(*coros_or_futures, loop=None, return_exceptions=False):
Yury Selivanov36c2c042017-12-19 07:19:53 -0500612 """Return a future aggregating results from the given coroutines/futures.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700613
Guido van Rossume3c65a72016-09-30 08:17:15 -0700614 Coroutines will be wrapped in a future and scheduled in the event
615 loop. They will not necessarily be scheduled in the same order as
616 passed in.
617
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700618 All futures must share the same event loop. If all the tasks are
619 done successfully, the returned future's result is the list of
620 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500621 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700622 exceptions in the tasks are treated the same as successful
623 results, and gathered in the result list; otherwise, the first
624 raised exception will be immediately propagated to the returned
625 future.
626
627 Cancellation: if the outer Future is cancelled, all children (that
628 have not completed yet) are also cancelled. If any child is
629 cancelled, this is treated as if it raised CancelledError --
630 the outer Future is *not* cancelled in this case. (This is to
631 prevent the cancellation of one child to cause other children to
632 be cancelled.)
633 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200634 if not coros_or_futures:
Yury Selivanov7661db62016-05-16 15:38:39 -0400635 if loop is None:
636 loop = events.get_event_loop()
637 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700638 outer.set_result([])
639 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200640
Yury Selivanov36c2c042017-12-19 07:19:53 -0500641 def _done_callback(fut):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700642 nonlocal nfinished
Yury Selivanov36c2c042017-12-19 07:19:53 -0500643 nfinished += 1
644
Victor Stinner3531d902015-01-09 01:42:52 +0100645 if outer.done():
646 if not fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700647 # Mark exception retrieved.
648 fut.exception()
649 return
Victor Stinner3531d902015-01-09 01:42:52 +0100650
Yury Selivanov36c2c042017-12-19 07:19:53 -0500651 if not return_exceptions:
652 if fut.cancelled():
653 # Check if 'fut' is cancelled first, as
654 # 'fut.exception()' will *raise* a CancelledError
655 # instead of returning it.
656 exc = futures.CancelledError()
657 outer.set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700658 return
Yury Selivanov36c2c042017-12-19 07:19:53 -0500659 else:
660 exc = fut.exception()
661 if exc is not None:
662 outer.set_exception(exc)
663 return
664
665 if nfinished == nfuts:
666 # All futures are done; create a list of results
667 # and set it to the 'outer' future.
668 results = []
669
670 for fut in children:
671 if fut.cancelled():
672 # Check if 'fut' is cancelled first, as
673 # 'fut.exception()' will *raise* a CancelledError
674 # instead of returning it.
675 res = futures.CancelledError()
676 else:
677 res = fut.exception()
678 if res is None:
679 res = fut.result()
680 results.append(res)
681
Miss Islington (bot)03643422018-05-29 15:29:12 -0700682 if outer._cancel_requested:
683 # If gather is being cancelled we must propagate the
684 # cancellation regardless of *return_exceptions* argument.
685 # See issue 32684.
686 outer.set_exception(futures.CancelledError())
687 else:
688 outer.set_result(results)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700689
Yury Selivanov36c2c042017-12-19 07:19:53 -0500690 arg_to_fut = {}
691 children = []
692 nfuts = 0
693 nfinished = 0
694 for arg in coros_or_futures:
695 if arg not in arg_to_fut:
696 fut = ensure_future(arg, loop=loop)
697 if loop is None:
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500698 loop = futures._get_loop(fut)
Yury Selivanov36c2c042017-12-19 07:19:53 -0500699 if fut is not arg:
700 # 'arg' was not a Future, therefore, 'fut' is a new
701 # Future created specifically for 'arg'. Since the caller
702 # can't control it, disable the "destroy pending task"
703 # warning.
704 fut._log_destroy_pending = False
705
706 nfuts += 1
707 arg_to_fut[arg] = fut
708 fut.add_done_callback(_done_callback)
709
710 else:
711 # There's a duplicate Future object in coros_or_futures.
712 fut = arg_to_fut[arg]
713
714 children.append(fut)
715
716 outer = _GatheringFuture(children, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700717 return outer
718
719
720def shield(arg, *, loop=None):
721 """Wait for a future, shielding it from cancellation.
722
723 The statement
724
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200725 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700726
727 is exactly equivalent to the statement
728
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200729 res = await something()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700730
731 *except* that if the coroutine containing it is cancelled, the
732 task running in something() is not cancelled. From the POV of
733 something(), the cancellation did not happen. But its caller is
734 still cancelled, so the yield-from expression still raises
735 CancelledError. Note: If something() is cancelled by other means
736 this will still cancel shield().
737
738 If you want to completely ignore cancellation (not recommended)
739 you can combine shield() with a try/except clause, as follows:
740
741 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200742 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700743 except CancelledError:
744 res = None
745 """
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400746 inner = ensure_future(arg, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700747 if inner.done():
748 # Shortcut.
749 return inner
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500750 loop = futures._get_loop(inner)
Yury Selivanov7661db62016-05-16 15:38:39 -0400751 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700752
753 def _done_callback(inner):
754 if outer.cancelled():
Victor Stinner3531d902015-01-09 01:42:52 +0100755 if not inner.cancelled():
756 # Mark inner's result as retrieved.
757 inner.exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700758 return
Victor Stinner3531d902015-01-09 01:42:52 +0100759
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700760 if inner.cancelled():
761 outer.cancel()
762 else:
763 exc = inner.exception()
764 if exc is not None:
765 outer.set_exception(exc)
766 else:
767 outer.set_result(inner.result())
768
769 inner.add_done_callback(_done_callback)
770 return outer
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700771
772
773def run_coroutine_threadsafe(coro, loop):
774 """Submit a coroutine object to a given event loop.
775
776 Return a concurrent.futures.Future to access the result.
777 """
778 if not coroutines.iscoroutine(coro):
779 raise TypeError('A coroutine object is required')
780 future = concurrent.futures.Future()
781
782 def callback():
Guido van Rossum601953b2015-10-05 16:20:00 -0700783 try:
784 futures._chain_future(ensure_future(coro, loop=loop), future)
785 except Exception as exc:
786 if future.set_running_or_notify_cancel():
787 future.set_exception(exc)
788 raise
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700789
790 loop.call_soon_threadsafe(callback)
791 return future
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200792
793
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500794# WeakSet containing all alive tasks.
795_all_tasks = weakref.WeakSet()
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200796
797# Dictionary containing tasks that are currently active in
798# all running event loops. {EventLoop: Task}
799_current_tasks = {}
800
801
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500802def _register_task(task):
803 """Register a new task in asyncio as executed by loop."""
804 _all_tasks.add(task)
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200805
806
807def _enter_task(loop, task):
808 current_task = _current_tasks.get(loop)
809 if current_task is not None:
810 raise RuntimeError(f"Cannot enter into task {task!r} while another "
811 f"task {current_task!r} is being executed.")
812 _current_tasks[loop] = task
813
814
815def _leave_task(loop, task):
816 current_task = _current_tasks.get(loop)
817 if current_task is not task:
818 raise RuntimeError(f"Leaving task {task!r} does not match "
819 f"the current task {current_task!r}.")
820 del _current_tasks[loop]
821
822
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500823def _unregister_task(task):
824 """Unregister a task."""
825 _all_tasks.discard(task)
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200826
827
828_py_register_task = _register_task
829_py_unregister_task = _unregister_task
830_py_enter_task = _enter_task
831_py_leave_task = _leave_task
832
833
834try:
835 from _asyncio import (_register_task, _unregister_task,
836 _enter_task, _leave_task,
837 _all_tasks, _current_tasks)
838except ImportError:
839 pass
840else:
841 _c_register_task = _register_task
842 _c_unregister_task = _unregister_task
843 _c_enter_task = _enter_task
844 _c_leave_task = _leave_task