blob: 72792a25cf55acc721346d4acacea5c235c73ee5 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Yury Selivanov6370f342017-12-10 18:36:12 -05003__all__ = (
Andrew Svetlovf74ef452017-12-15 07:04:38 +02004 'Task', 'create_task',
Yury Selivanov6370f342017-12-10 18:36:12 -05005 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
Yury Selivanov9edad3c2017-12-11 10:03:48 -05006 'wait', 'wait_for', 'as_completed', 'sleep',
Yury Selivanov6370f342017-12-10 18:36:12 -05007 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
Andrew Svetlov44d1a592017-12-16 21:58:38 +02008 'current_task', 'all_tasks',
9 '_register_task', '_unregister_task', '_enter_task', '_leave_task',
Yury Selivanov6370f342017-12-10 18:36:12 -050010)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012import concurrent.futures
Yury Selivanovf23746a2018-01-22 19:11:18 -050013import contextvars
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014import functools
15import inspect
Andrew Svetlov5f841b52017-12-09 00:23:48 +020016import types
Yury Selivanov59eb9a42015-05-11 14:48:38 -040017import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018import weakref
19
Yury Selivanova0c1ba62016-10-28 12:52:37 -040020from . import base_tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020021from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022from . import events
23from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020024from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026
Andrew Svetlov44d1a592017-12-16 21:58:38 +020027def current_task(loop=None):
28 """Return a currently executed task."""
29 if loop is None:
30 loop = events.get_running_loop()
31 return _current_tasks.get(loop)
32
33
34def all_tasks(loop=None):
35 """Return a set of all tasks for the loop."""
36 if loop is None:
Miss Islington (bot)ddc613f2018-05-28 17:16:43 -070037 loop = events.get_running_loop()
38 return {t for t in _all_tasks
39 if futures._get_loop(t) is loop and not t.done()}
40
41
42def _all_tasks_compat(loop=None):
43 # Different from "all_task()" by returning *all* Tasks, including
44 # the completed ones. Used to implement deprecated "Tasks.all_task()"
45 # method.
46 if loop is None:
Andrew Svetlov44d1a592017-12-16 21:58:38 +020047 loop = events.get_event_loop()
Yury Selivanovca9b36c2017-12-23 15:04:15 -050048 return {t for t in _all_tasks if futures._get_loop(t) is loop}
Andrew Svetlov44d1a592017-12-16 21:58:38 +020049
50
Yury Selivanov0cf16f92017-12-25 10:48:15 -050051class Task(futures._PyFuture): # Inherit Python Task implementation
52 # from a Python Future implementation.
53
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070054 """A coroutine wrapped in a Future."""
55
56 # An important invariant maintained while a Task not done:
57 #
58 # - Either _fut_waiter is None, and _step() is scheduled;
59 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
60 #
61 # The only transition from the latter to the former is through
62 # _wakeup(). When _fut_waiter is not None, one of its callbacks
63 # must be _wakeup().
64
Victor Stinnerfe22e092014-12-04 23:00:13 +010065 # If False, don't log a message if the task is destroyed whereas its
66 # status is still pending
67 _log_destroy_pending = True
68
Guido van Rossum1a605ed2013-12-06 12:57:40 -080069 @classmethod
70 def current_task(cls, loop=None):
71 """Return the currently running task in an event loop or None.
72
73 By default the current task for the current event loop is returned.
74
75 None is returned when called not in the context of a Task.
76 """
Andrew Svetlov44d1a592017-12-16 21:58:38 +020077 warnings.warn("Task.current_task() is deprecated, "
78 "use asyncio.current_task() instead",
79 PendingDeprecationWarning,
80 stacklevel=2)
Guido van Rossum1a605ed2013-12-06 12:57:40 -080081 if loop is None:
82 loop = events.get_event_loop()
Andrew Svetlov44d1a592017-12-16 21:58:38 +020083 return current_task(loop)
Guido van Rossum1a605ed2013-12-06 12:57:40 -080084
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070085 @classmethod
86 def all_tasks(cls, loop=None):
87 """Return a set of all tasks for an event loop.
88
89 By default all tasks for the current event loop are returned.
90 """
Andrew Svetlov44d1a592017-12-16 21:58:38 +020091 warnings.warn("Task.all_tasks() is deprecated, "
92 "use asyncio.all_tasks() instead",
93 PendingDeprecationWarning,
94 stacklevel=2)
Miss Islington (bot)ddc613f2018-05-28 17:16:43 -070095 return _all_tasks_compat(loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070096
97 def __init__(self, coro, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070098 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +020099 if self._source_traceback:
100 del self._source_traceback[-1]
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200101 if not coroutines.iscoroutine(coro):
102 # raise after Future.__init__(), attrs are required for __del__
103 # prevent logging for pending task in __del__
104 self._log_destroy_pending = False
105 raise TypeError(f"a coroutine was expected, got {coro!r}")
106
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700107 self._must_cancel = False
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200108 self._fut_waiter = None
109 self._coro = coro
Yury Selivanovf23746a2018-01-22 19:11:18 -0500110 self._context = contextvars.copy_context()
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200111
Yury Selivanov22feeb82018-01-24 11:31:01 -0500112 self._loop.call_soon(self.__step, context=self._context)
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500113 _register_task(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700114
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900115 def __del__(self):
116 if self._state == futures._PENDING and self._log_destroy_pending:
117 context = {
118 'task': self,
119 'message': 'Task was destroyed but it is pending!',
120 }
121 if self._source_traceback:
122 context['source_traceback'] = self._source_traceback
123 self._loop.call_exception_handler(context)
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500124 super().__del__()
Victor Stinnera02f81f2014-06-24 22:37:53 +0200125
Victor Stinner313a9802014-07-29 12:58:23 +0200126 def _repr_info(self):
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400127 return base_tasks._task_repr_info(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700128
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500129 def set_result(self, result):
130 raise RuntimeError('Task does not support set_result operation')
131
132 def set_exception(self, exception):
133 raise RuntimeError('Task does not support set_exception operation')
134
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700135 def get_stack(self, *, limit=None):
136 """Return the list of stack frames for this task's coroutine.
137
Victor Stinnerd87de832014-12-02 17:57:04 +0100138 If the coroutine is not done, this returns the stack where it is
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700139 suspended. If the coroutine has completed successfully or was
140 cancelled, this returns an empty list. If the coroutine was
141 terminated by an exception, this returns the list of traceback
142 frames.
143
144 The frames are always ordered from oldest to newest.
145
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500146 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700147 return; by default all available frames are returned. Its
148 meaning differs depending on whether a stack or a traceback is
149 returned: the newest frames of a stack are returned, but the
150 oldest frames of a traceback are returned. (This matches the
151 behavior of the traceback module.)
152
153 For reasons beyond our control, only one stack frame is
154 returned for a suspended coroutine.
155 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400156 return base_tasks._task_get_stack(self, limit)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700157
158 def print_stack(self, *, limit=None, file=None):
159 """Print the stack or traceback for this task's coroutine.
160
161 This produces output similar to that of the traceback module,
162 for the frames retrieved by get_stack(). The limit argument
163 is passed to get_stack(). The file argument is an I/O stream
R David Murray8e069d52014-09-24 13:13:45 -0400164 to which the output is written; by default output is written
165 to sys.stderr.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700166 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400167 return base_tasks._task_print_stack(self, limit, file)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700168
169 def cancel(self):
R David Murray8e069d52014-09-24 13:13:45 -0400170 """Request that this task cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200171
Victor Stinner8d213572014-06-02 23:06:46 +0200172 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200173 wrapped coroutine on the next cycle through the event loop.
174 The coroutine then has a chance to clean up or even deny
175 the request using try/except/finally.
176
R David Murray8e069d52014-09-24 13:13:45 -0400177 Unlike Future.cancel, this does not guarantee that the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200178 task will be cancelled: the exception might be caught and
R David Murray8e069d52014-09-24 13:13:45 -0400179 acted upon, delaying cancellation of the task or preventing
180 cancellation completely. The task may also return a value or
181 raise a different exception.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200182
183 Immediately after this method is called, Task.cancelled() will
184 not return True (unless the task was already cancelled). A
185 task will be marked as cancelled when the wrapped coroutine
186 terminates with a CancelledError exception (even if cancel()
187 was not called).
188 """
Yury Selivanov7ce1c6f2017-06-11 13:49:18 +0000189 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700190 if self.done():
191 return False
192 if self._fut_waiter is not None:
193 if self._fut_waiter.cancel():
194 # Leave self._fut_waiter; it may be a Task that
195 # catches and ignores the cancellation so we may have
196 # to cancel it again later.
197 return True
Yury Selivanov22feeb82018-01-24 11:31:01 -0500198 # It must be the case that self.__step is already scheduled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700199 self._must_cancel = True
200 return True
201
Yury Selivanov22feeb82018-01-24 11:31:01 -0500202 def __step(self, exc=None):
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500203 if self.done():
204 raise futures.InvalidStateError(
205 f'_step(): already done: {self!r}, {exc!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700206 if self._must_cancel:
207 if not isinstance(exc, futures.CancelledError):
208 exc = futures.CancelledError()
209 self._must_cancel = False
210 coro = self._coro
211 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800212
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200213 _enter_task(self._loop, self)
Yury Selivanovd59bba82015-11-20 12:41:03 -0500214 # Call either coro.throw(exc) or coro.send(None).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700215 try:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500216 if exc is None:
217 # We use the `send` method directly, because coroutines
218 # don't have `__iter__` and `__next__` methods.
219 result = coro.send(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700220 else:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500221 result = coro.throw(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700222 except StopIteration as exc:
INADA Naoki991adca2017-05-11 21:18:38 +0900223 if self._must_cancel:
224 # Task is cancelled right before coro stops.
225 self._must_cancel = False
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500226 super().set_exception(futures.CancelledError())
INADA Naoki991adca2017-05-11 21:18:38 +0900227 else:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500228 super().set_result(exc.value)
Yury Selivanov4145c832016-10-09 12:19:12 -0400229 except futures.CancelledError:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700230 super().cancel() # I.e., Future.cancel(self).
231 except Exception as exc:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500232 super().set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700233 except BaseException as exc:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500234 super().set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700235 raise
236 else:
Guido van Rossum1140a032016-09-09 12:54:54 -0700237 blocking = getattr(result, '_asyncio_future_blocking', None)
238 if blocking is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700239 # Yielded Future must come from Future.__iter__().
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500240 if futures._get_loop(result) is not self._loop:
Yury Selivanov6370f342017-12-10 18:36:12 -0500241 new_exc = RuntimeError(
242 f'Task {self!r} got Future '
243 f'{result!r} attached to a different loop')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500244 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500245 self.__step, new_exc, context=self._context)
Guido van Rossum1140a032016-09-09 12:54:54 -0700246 elif blocking:
Yury Selivanov4145c832016-10-09 12:19:12 -0400247 if result is self:
Yury Selivanov6370f342017-12-10 18:36:12 -0500248 new_exc = RuntimeError(
249 f'Task cannot await on itself: {self!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500250 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500251 self.__step, new_exc, context=self._context)
Yury Selivanov4145c832016-10-09 12:19:12 -0400252 else:
253 result._asyncio_future_blocking = False
Yury Selivanovf23746a2018-01-22 19:11:18 -0500254 result.add_done_callback(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500255 self.__wakeup, context=self._context)
Yury Selivanov4145c832016-10-09 12:19:12 -0400256 self._fut_waiter = result
257 if self._must_cancel:
258 if self._fut_waiter.cancel():
259 self._must_cancel = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700260 else:
Yury Selivanov6370f342017-12-10 18:36:12 -0500261 new_exc = RuntimeError(
262 f'yield was used instead of yield from '
263 f'in task {self!r} with {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500264 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500265 self.__step, new_exc, context=self._context)
Yury Selivanov6370f342017-12-10 18:36:12 -0500266
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700267 elif result is None:
268 # Bare yield relinquishes control for one event loop iteration.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500269 self._loop.call_soon(self.__step, context=self._context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700270 elif inspect.isgenerator(result):
271 # Yielding a generator is just wrong.
Yury Selivanov6370f342017-12-10 18:36:12 -0500272 new_exc = RuntimeError(
273 f'yield was used instead of yield from for '
Miss Islington (bot)52d17412018-05-20 07:34:28 -0700274 f'generator in task {self!r} with {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500275 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500276 self.__step, new_exc, context=self._context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700277 else:
278 # Yielding something else is an error.
Yury Selivanov6370f342017-12-10 18:36:12 -0500279 new_exc = RuntimeError(f'Task got bad yield: {result!r}')
Yury Selivanovf23746a2018-01-22 19:11:18 -0500280 self._loop.call_soon(
Yury Selivanov22feeb82018-01-24 11:31:01 -0500281 self.__step, new_exc, context=self._context)
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800282 finally:
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200283 _leave_task(self._loop, self)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100284 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700285
Yury Selivanov22feeb82018-01-24 11:31:01 -0500286 def __wakeup(self, future):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700287 try:
Yury Selivanova4afc482015-11-16 15:12:10 -0500288 future.result()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700289 except Exception as exc:
290 # This may also be a cancellation.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500291 self.__step(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700292 else:
Yury Selivanova4afc482015-11-16 15:12:10 -0500293 # Don't pass the value of `future.result()` explicitly,
294 # as `Future.__iter__` and `Future.__await__` don't need it.
295 # If we call `_step(value, None)` instead of `_step()`,
296 # Python eval loop would use `.send(value)` method call,
297 # instead of `__next__()`, which is slower for futures
298 # that return non-generator iterators from their `__iter__`.
Yury Selivanov22feeb82018-01-24 11:31:01 -0500299 self.__step()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700300 self = None # Needed to break cycles when an exception occurs.
301
302
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400303_PyTask = Task
304
305
306try:
307 import _asyncio
308except ImportError:
309 pass
310else:
311 # _CTask is needed for tests.
312 Task = _CTask = _asyncio.Task
313
314
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200315def create_task(coro):
316 """Schedule the execution of a coroutine object in a spawn task.
317
318 Return a Task object.
319 """
320 loop = events.get_running_loop()
321 return loop.create_task(coro)
322
323
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700324# wait() and as_completed() similar to those in PEP 3148.
325
326FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
327FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
328ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
329
330
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200331async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700332 """Wait for the Futures and coroutines given by fs to complete.
333
Victor Stinnerdb74d982014-06-10 11:16:05 +0200334 The sequence futures must not be empty.
335
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700336 Coroutines will be wrapped in Tasks.
337
338 Returns two sets of Future: (done, pending).
339
340 Usage:
341
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200342 done, pending = await asyncio.wait(fs)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700343
344 Note: This does not raise TimeoutError! Futures that aren't done
345 when the timeout occurs are returned in the second set.
346 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700347 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500348 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700349 if not fs:
350 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200351 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
Yury Selivanov6370f342017-12-10 18:36:12 -0500352 raise ValueError(f'Invalid return_when value: {return_when}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700353
354 if loop is None:
355 loop = events.get_event_loop()
356
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400357 fs = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700358
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200359 return await _wait(fs, timeout, return_when, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700360
361
Victor Stinner59e08022014-08-28 11:19:25 +0200362def _release_waiter(waiter, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700363 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200364 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700365
366
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200367async def wait_for(fut, timeout, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700368 """Wait for the single Future or coroutine to complete, with timeout.
369
370 Coroutine will be wrapped in Task.
371
Victor Stinner421e49b2014-01-23 17:40:59 +0100372 Returns result of the Future or coroutine. When a timeout occurs,
373 it cancels the task and raises TimeoutError. To avoid the task
374 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700375
Victor Stinner922bc2c2015-01-15 16:29:10 +0100376 If the wait is cancelled, the task is also cancelled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700377
Victor Stinner922bc2c2015-01-15 16:29:10 +0100378 This function is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700379 """
380 if loop is None:
381 loop = events.get_event_loop()
382
Guido van Rossum48c66c32014-01-29 14:30:38 -0800383 if timeout is None:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200384 return await fut
Guido van Rossum48c66c32014-01-29 14:30:38 -0800385
Victor K4d071892017-10-05 19:04:39 +0300386 if timeout <= 0:
387 fut = ensure_future(fut, loop=loop)
388
389 if fut.done():
390 return fut.result()
391
392 fut.cancel()
393 raise futures.TimeoutError()
394
Yury Selivanov7661db62016-05-16 15:38:39 -0400395 waiter = loop.create_future()
Victor Stinner59e08022014-08-28 11:19:25 +0200396 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
397 cb = functools.partial(_release_waiter, waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700398
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400399 fut = ensure_future(fut, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700400 fut.add_done_callback(cb)
401
402 try:
Victor Stinner59e08022014-08-28 11:19:25 +0200403 # wait until the future completes or the timeout
Victor Stinner922bc2c2015-01-15 16:29:10 +0100404 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200405 await waiter
Victor Stinner922bc2c2015-01-15 16:29:10 +0100406 except futures.CancelledError:
407 fut.remove_done_callback(cb)
408 fut.cancel()
409 raise
Victor Stinner59e08022014-08-28 11:19:25 +0200410
411 if fut.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700412 return fut.result()
413 else:
414 fut.remove_done_callback(cb)
Miss Islington (bot)d8948c52018-05-29 15:37:06 -0700415 # We must ensure that the task is not running
416 # after wait_for() returns.
417 # See https://bugs.python.org/issue32751
418 await _cancel_and_wait(fut, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700419 raise futures.TimeoutError()
420 finally:
421 timeout_handle.cancel()
422
423
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200424async def _wait(fs, timeout, return_when, loop):
Miss Islington (bot)d8948c52018-05-29 15:37:06 -0700425 """Internal helper for wait().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700426
427 The fs argument must be a collection of Futures.
428 """
429 assert fs, 'Set of Futures is empty.'
Yury Selivanov7661db62016-05-16 15:38:39 -0400430 waiter = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700431 timeout_handle = None
432 if timeout is not None:
433 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
434 counter = len(fs)
435
436 def _on_completion(f):
437 nonlocal counter
438 counter -= 1
439 if (counter <= 0 or
440 return_when == FIRST_COMPLETED or
441 return_when == FIRST_EXCEPTION and (not f.cancelled() and
442 f.exception() is not None)):
443 if timeout_handle is not None:
444 timeout_handle.cancel()
445 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200446 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700447
448 for f in fs:
449 f.add_done_callback(_on_completion)
450
451 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200452 await waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700453 finally:
454 if timeout_handle is not None:
455 timeout_handle.cancel()
456
457 done, pending = set(), set()
458 for f in fs:
459 f.remove_done_callback(_on_completion)
460 if f.done():
461 done.add(f)
462 else:
463 pending.add(f)
464 return done, pending
465
466
Miss Islington (bot)d8948c52018-05-29 15:37:06 -0700467async def _cancel_and_wait(fut, loop):
468 """Cancel the *fut* future or task and wait until it completes."""
469
470 waiter = loop.create_future()
471 cb = functools.partial(_release_waiter, waiter)
472 fut.add_done_callback(cb)
473
474 try:
475 fut.cancel()
476 # We cannot wait on *fut* directly to make
477 # sure _cancel_and_wait itself is reliably cancellable.
478 await waiter
479 finally:
480 fut.remove_done_callback(cb)
481
482
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700483# This is *not* a @coroutine! It is just an iterator (yielding Futures).
484def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800485 """Return an iterator whose values are coroutines.
486
487 When waiting for the yielded coroutines you'll get the results (or
488 exceptions!) of the original Futures (or coroutines), in the order
489 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700490
491 This differs from PEP 3148; the proper way to use this is:
492
493 for f in as_completed(fs):
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200494 result = await f # The 'await' may raise.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700495 # Use result.
496
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200497 If a timeout is specified, the 'await' will raise
Guido van Rossumb58f0532014-02-12 17:58:19 -0800498 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700499
500 Note: The futures 'f' are not necessarily members of fs.
501 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700502 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500503 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700504 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400505 todo = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800506 from .queues import Queue # Import here to avoid circular import problem.
507 done = Queue(loop=loop)
508 timeout_handle = None
509
510 def _on_timeout():
511 for f in todo:
512 f.remove_done_callback(_on_completion)
513 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
514 todo.clear() # Can't do todo.remove(f) in the loop.
515
516 def _on_completion(f):
517 if not todo:
518 return # _on_timeout() was here first.
519 todo.remove(f)
520 done.put_nowait(f)
521 if not todo and timeout_handle is not None:
522 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700523
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200524 async def _wait_for_one():
525 f = await done.get()
Guido van Rossumb58f0532014-02-12 17:58:19 -0800526 if f is None:
527 # Dummy value from _on_timeout().
528 raise futures.TimeoutError
529 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700530
Guido van Rossumb58f0532014-02-12 17:58:19 -0800531 for f in todo:
532 f.add_done_callback(_on_completion)
533 if todo and timeout is not None:
534 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700535 for _ in range(len(todo)):
536 yield _wait_for_one()
537
538
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200539@types.coroutine
540def __sleep0():
541 """Skip one event loop run cycle.
542
543 This is a private helper for 'asyncio.sleep()', used
544 when the 'delay' is set to 0. It uses a bare 'yield'
Yury Selivanov22feeb82018-01-24 11:31:01 -0500545 expression (which Task.__step knows how to handle)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200546 instead of creating a Future object.
547 """
548 yield
549
550
551async def sleep(delay, result=None, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700552 """Coroutine that completes after a given time (in seconds)."""
Andrew Svetlov5382c052017-12-17 16:41:30 +0200553 if delay <= 0:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200554 await __sleep0()
Yury Selivanovade04122015-11-05 14:29:04 -0500555 return result
556
Yury Selivanov7661db62016-05-16 15:38:39 -0400557 if loop is None:
558 loop = events.get_event_loop()
559 future = loop.create_future()
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500560 h = loop.call_later(delay,
561 futures._set_result_unless_cancelled,
562 future, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700563 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200564 return await future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700565 finally:
566 h.cancel()
567
568
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400569def ensure_future(coro_or_future, *, loop=None):
Yury Selivanov620279b2015-10-02 15:00:19 -0400570 """Wrap a coroutine or an awaitable in a future.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400571
572 If the argument is a Future, it is returned directly.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700573 """
Miss Islington (bot)f8fdb362018-05-28 11:42:50 -0700574 if coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200575 if loop is None:
576 loop = events.get_event_loop()
577 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200578 if task._source_traceback:
579 del task._source_traceback[-1]
580 return task
Miss Islington (bot)f8fdb362018-05-28 11:42:50 -0700581 elif futures.isfuture(coro_or_future):
582 if loop is not None and loop is not futures._get_loop(coro_or_future):
583 raise ValueError('loop argument must agree with Future')
584 return coro_or_future
Victor Stinner3f438a92017-11-28 14:43:52 +0100585 elif inspect.isawaitable(coro_or_future):
Yury Selivanov620279b2015-10-02 15:00:19 -0400586 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700587 else:
Charles Renwickae5b3262017-04-21 16:49:48 -0400588 raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
589 'required')
Yury Selivanov620279b2015-10-02 15:00:19 -0400590
591
592@coroutine
593def _wrap_awaitable(awaitable):
594 """Helper for asyncio.ensure_future().
595
596 Wraps awaitable (an object with __await__) into a coroutine
597 that will later be wrapped in a Task by ensure_future().
598 """
599 return (yield from awaitable.__await__())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700600
601
602class _GatheringFuture(futures.Future):
603 """Helper for gather().
604
605 This overrides cancel() to cancel all the children and act more
606 like Task.cancel(), which doesn't immediately mark itself as
607 cancelled.
608 """
609
610 def __init__(self, children, *, loop=None):
611 super().__init__(loop=loop)
612 self._children = children
Miss Islington (bot)03643422018-05-29 15:29:12 -0700613 self._cancel_requested = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700614
615 def cancel(self):
616 if self.done():
617 return False
Yury Selivanov3d676152016-10-21 17:22:17 -0400618 ret = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700619 for child in self._children:
Yury Selivanov3d676152016-10-21 17:22:17 -0400620 if child.cancel():
621 ret = True
Miss Islington (bot)03643422018-05-29 15:29:12 -0700622 if ret:
623 # If any child tasks were actually cancelled, we should
624 # propagate the cancellation request regardless of
625 # *return_exceptions* argument. See issue 32684.
626 self._cancel_requested = True
Yury Selivanov3d676152016-10-21 17:22:17 -0400627 return ret
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700628
629
630def gather(*coros_or_futures, loop=None, return_exceptions=False):
Yury Selivanov36c2c042017-12-19 07:19:53 -0500631 """Return a future aggregating results from the given coroutines/futures.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700632
Guido van Rossume3c65a72016-09-30 08:17:15 -0700633 Coroutines will be wrapped in a future and scheduled in the event
634 loop. They will not necessarily be scheduled in the same order as
635 passed in.
636
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700637 All futures must share the same event loop. If all the tasks are
638 done successfully, the returned future's result is the list of
639 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500640 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700641 exceptions in the tasks are treated the same as successful
642 results, and gathered in the result list; otherwise, the first
643 raised exception will be immediately propagated to the returned
644 future.
645
646 Cancellation: if the outer Future is cancelled, all children (that
647 have not completed yet) are also cancelled. If any child is
648 cancelled, this is treated as if it raised CancelledError --
649 the outer Future is *not* cancelled in this case. (This is to
650 prevent the cancellation of one child to cause other children to
651 be cancelled.)
652 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200653 if not coros_or_futures:
Yury Selivanov7661db62016-05-16 15:38:39 -0400654 if loop is None:
655 loop = events.get_event_loop()
656 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700657 outer.set_result([])
658 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200659
Yury Selivanov36c2c042017-12-19 07:19:53 -0500660 def _done_callback(fut):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700661 nonlocal nfinished
Yury Selivanov36c2c042017-12-19 07:19:53 -0500662 nfinished += 1
663
Victor Stinner3531d902015-01-09 01:42:52 +0100664 if outer.done():
665 if not fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700666 # Mark exception retrieved.
667 fut.exception()
668 return
Victor Stinner3531d902015-01-09 01:42:52 +0100669
Yury Selivanov36c2c042017-12-19 07:19:53 -0500670 if not return_exceptions:
671 if fut.cancelled():
672 # Check if 'fut' is cancelled first, as
673 # 'fut.exception()' will *raise* a CancelledError
674 # instead of returning it.
675 exc = futures.CancelledError()
676 outer.set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700677 return
Yury Selivanov36c2c042017-12-19 07:19:53 -0500678 else:
679 exc = fut.exception()
680 if exc is not None:
681 outer.set_exception(exc)
682 return
683
684 if nfinished == nfuts:
685 # All futures are done; create a list of results
686 # and set it to the 'outer' future.
687 results = []
688
689 for fut in children:
690 if fut.cancelled():
691 # Check if 'fut' is cancelled first, as
692 # 'fut.exception()' will *raise* a CancelledError
693 # instead of returning it.
694 res = futures.CancelledError()
695 else:
696 res = fut.exception()
697 if res is None:
698 res = fut.result()
699 results.append(res)
700
Miss Islington (bot)03643422018-05-29 15:29:12 -0700701 if outer._cancel_requested:
702 # If gather is being cancelled we must propagate the
703 # cancellation regardless of *return_exceptions* argument.
704 # See issue 32684.
705 outer.set_exception(futures.CancelledError())
706 else:
707 outer.set_result(results)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700708
Yury Selivanov36c2c042017-12-19 07:19:53 -0500709 arg_to_fut = {}
710 children = []
711 nfuts = 0
712 nfinished = 0
713 for arg in coros_or_futures:
714 if arg not in arg_to_fut:
715 fut = ensure_future(arg, loop=loop)
716 if loop is None:
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500717 loop = futures._get_loop(fut)
Yury Selivanov36c2c042017-12-19 07:19:53 -0500718 if fut is not arg:
719 # 'arg' was not a Future, therefore, 'fut' is a new
720 # Future created specifically for 'arg'. Since the caller
721 # can't control it, disable the "destroy pending task"
722 # warning.
723 fut._log_destroy_pending = False
724
725 nfuts += 1
726 arg_to_fut[arg] = fut
727 fut.add_done_callback(_done_callback)
728
729 else:
730 # There's a duplicate Future object in coros_or_futures.
731 fut = arg_to_fut[arg]
732
733 children.append(fut)
734
735 outer = _GatheringFuture(children, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700736 return outer
737
738
739def shield(arg, *, loop=None):
740 """Wait for a future, shielding it from cancellation.
741
742 The statement
743
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200744 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700745
746 is exactly equivalent to the statement
747
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200748 res = await something()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700749
750 *except* that if the coroutine containing it is cancelled, the
751 task running in something() is not cancelled. From the POV of
752 something(), the cancellation did not happen. But its caller is
753 still cancelled, so the yield-from expression still raises
754 CancelledError. Note: If something() is cancelled by other means
755 this will still cancel shield().
756
757 If you want to completely ignore cancellation (not recommended)
758 you can combine shield() with a try/except clause, as follows:
759
760 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200761 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700762 except CancelledError:
763 res = None
764 """
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400765 inner = ensure_future(arg, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700766 if inner.done():
767 # Shortcut.
768 return inner
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500769 loop = futures._get_loop(inner)
Yury Selivanov7661db62016-05-16 15:38:39 -0400770 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700771
772 def _done_callback(inner):
773 if outer.cancelled():
Victor Stinner3531d902015-01-09 01:42:52 +0100774 if not inner.cancelled():
775 # Mark inner's result as retrieved.
776 inner.exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700777 return
Victor Stinner3531d902015-01-09 01:42:52 +0100778
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700779 if inner.cancelled():
780 outer.cancel()
781 else:
782 exc = inner.exception()
783 if exc is not None:
784 outer.set_exception(exc)
785 else:
786 outer.set_result(inner.result())
787
788 inner.add_done_callback(_done_callback)
789 return outer
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700790
791
792def run_coroutine_threadsafe(coro, loop):
793 """Submit a coroutine object to a given event loop.
794
795 Return a concurrent.futures.Future to access the result.
796 """
797 if not coroutines.iscoroutine(coro):
798 raise TypeError('A coroutine object is required')
799 future = concurrent.futures.Future()
800
801 def callback():
Guido van Rossum601953b2015-10-05 16:20:00 -0700802 try:
803 futures._chain_future(ensure_future(coro, loop=loop), future)
804 except Exception as exc:
805 if future.set_running_or_notify_cancel():
806 future.set_exception(exc)
807 raise
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700808
809 loop.call_soon_threadsafe(callback)
810 return future
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200811
812
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500813# WeakSet containing all alive tasks.
814_all_tasks = weakref.WeakSet()
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200815
816# Dictionary containing tasks that are currently active in
817# all running event loops. {EventLoop: Task}
818_current_tasks = {}
819
820
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500821def _register_task(task):
822 """Register a new task in asyncio as executed by loop."""
823 _all_tasks.add(task)
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200824
825
826def _enter_task(loop, task):
827 current_task = _current_tasks.get(loop)
828 if current_task is not None:
829 raise RuntimeError(f"Cannot enter into task {task!r} while another "
830 f"task {current_task!r} is being executed.")
831 _current_tasks[loop] = task
832
833
834def _leave_task(loop, task):
835 current_task = _current_tasks.get(loop)
836 if current_task is not task:
837 raise RuntimeError(f"Leaving task {task!r} does not match "
838 f"the current task {current_task!r}.")
839 del _current_tasks[loop]
840
841
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500842def _unregister_task(task):
843 """Unregister a task."""
844 _all_tasks.discard(task)
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200845
846
847_py_register_task = _register_task
848_py_unregister_task = _unregister_task
849_py_enter_task = _enter_task
850_py_leave_task = _leave_task
851
852
853try:
854 from _asyncio import (_register_task, _unregister_task,
855 _enter_task, _leave_task,
856 _all_tasks, _current_tasks)
857except ImportError:
858 pass
859else:
860 _c_register_task = _register_task
861 _c_unregister_task = _unregister_task
862 _c_enter_task = _enter_task
863 _c_leave_task = _leave_task