blob: 275141c65e7e224edbd61a9f5fc26d4882452db3 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Yury Selivanov6370f342017-12-10 18:36:12 -05003__all__ = (
Andrew Svetlovf74ef452017-12-15 07:04:38 +02004 'Task', 'create_task',
Yury Selivanov6370f342017-12-10 18:36:12 -05005 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
Yury Selivanov9edad3c2017-12-11 10:03:48 -05006 'wait', 'wait_for', 'as_completed', 'sleep',
Yury Selivanov6370f342017-12-10 18:36:12 -05007 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
Andrew Svetlov44d1a592017-12-16 21:58:38 +02008 'current_task', 'all_tasks',
9 '_register_task', '_unregister_task', '_enter_task', '_leave_task',
Yury Selivanov6370f342017-12-10 18:36:12 -050010)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012import concurrent.futures
13import functools
14import inspect
Andrew Svetlov5f841b52017-12-09 00:23:48 +020015import types
Yury Selivanov59eb9a42015-05-11 14:48:38 -040016import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017import weakref
18
Yury Selivanova0c1ba62016-10-28 12:52:37 -040019from . import base_tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020020from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021from . import events
22from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020023from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070024
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025
Andrew Svetlov44d1a592017-12-16 21:58:38 +020026def current_task(loop=None):
27 """Return a currently executed task."""
28 if loop is None:
29 loop = events.get_running_loop()
30 return _current_tasks.get(loop)
31
32
33def all_tasks(loop=None):
34 """Return a set of all tasks for the loop."""
35 if loop is None:
36 loop = events.get_event_loop()
37 return {t for t, l in _all_tasks.items() if l is loop}
38
39
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070040class Task(futures.Future):
41 """A coroutine wrapped in a Future."""
42
43 # An important invariant maintained while a Task not done:
44 #
45 # - Either _fut_waiter is None, and _step() is scheduled;
46 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
47 #
48 # The only transition from the latter to the former is through
49 # _wakeup(). When _fut_waiter is not None, one of its callbacks
50 # must be _wakeup().
51
Victor Stinnerfe22e092014-12-04 23:00:13 +010052 # If False, don't log a message if the task is destroyed whereas its
53 # status is still pending
54 _log_destroy_pending = True
55
Guido van Rossum1a605ed2013-12-06 12:57:40 -080056 @classmethod
57 def current_task(cls, loop=None):
58 """Return the currently running task in an event loop or None.
59
60 By default the current task for the current event loop is returned.
61
62 None is returned when called not in the context of a Task.
63 """
Andrew Svetlov44d1a592017-12-16 21:58:38 +020064 warnings.warn("Task.current_task() is deprecated, "
65 "use asyncio.current_task() instead",
66 PendingDeprecationWarning,
67 stacklevel=2)
Guido van Rossum1a605ed2013-12-06 12:57:40 -080068 if loop is None:
69 loop = events.get_event_loop()
Andrew Svetlov44d1a592017-12-16 21:58:38 +020070 return current_task(loop)
Guido van Rossum1a605ed2013-12-06 12:57:40 -080071
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070072 @classmethod
73 def all_tasks(cls, loop=None):
74 """Return a set of all tasks for an event loop.
75
76 By default all tasks for the current event loop are returned.
77 """
Andrew Svetlov44d1a592017-12-16 21:58:38 +020078 warnings.warn("Task.all_tasks() is deprecated, "
79 "use asyncio.all_tasks() instead",
80 PendingDeprecationWarning,
81 stacklevel=2)
82 return all_tasks(loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070083
84 def __init__(self, coro, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070085 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +020086 if self._source_traceback:
87 del self._source_traceback[-1]
Andrew Svetlovf74ef452017-12-15 07:04:38 +020088 if not coroutines.iscoroutine(coro):
89 # raise after Future.__init__(), attrs are required for __del__
90 # prevent logging for pending task in __del__
91 self._log_destroy_pending = False
92 raise TypeError(f"a coroutine was expected, got {coro!r}")
93
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070094 self._must_cancel = False
Andrew Svetlovf74ef452017-12-15 07:04:38 +020095 self._fut_waiter = None
96 self._coro = coro
97
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070098 self._loop.call_soon(self._step)
Andrew Svetlov44d1a592017-12-16 21:58:38 +020099 _register_task(self._loop, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700100
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900101 def __del__(self):
102 if self._state == futures._PENDING and self._log_destroy_pending:
103 context = {
104 'task': self,
105 'message': 'Task was destroyed but it is pending!',
106 }
107 if self._source_traceback:
108 context['source_traceback'] = self._source_traceback
109 self._loop.call_exception_handler(context)
110 futures.Future.__del__(self)
Victor Stinnera02f81f2014-06-24 22:37:53 +0200111
Victor Stinner313a9802014-07-29 12:58:23 +0200112 def _repr_info(self):
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400113 return base_tasks._task_repr_info(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700114
115 def get_stack(self, *, limit=None):
116 """Return the list of stack frames for this task's coroutine.
117
Victor Stinnerd87de832014-12-02 17:57:04 +0100118 If the coroutine is not done, this returns the stack where it is
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700119 suspended. If the coroutine has completed successfully or was
120 cancelled, this returns an empty list. If the coroutine was
121 terminated by an exception, this returns the list of traceback
122 frames.
123
124 The frames are always ordered from oldest to newest.
125
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500126 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700127 return; by default all available frames are returned. Its
128 meaning differs depending on whether a stack or a traceback is
129 returned: the newest frames of a stack are returned, but the
130 oldest frames of a traceback are returned. (This matches the
131 behavior of the traceback module.)
132
133 For reasons beyond our control, only one stack frame is
134 returned for a suspended coroutine.
135 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400136 return base_tasks._task_get_stack(self, limit)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700137
138 def print_stack(self, *, limit=None, file=None):
139 """Print the stack or traceback for this task's coroutine.
140
141 This produces output similar to that of the traceback module,
142 for the frames retrieved by get_stack(). The limit argument
143 is passed to get_stack(). The file argument is an I/O stream
R David Murray8e069d52014-09-24 13:13:45 -0400144 to which the output is written; by default output is written
145 to sys.stderr.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700146 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400147 return base_tasks._task_print_stack(self, limit, file)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700148
149 def cancel(self):
R David Murray8e069d52014-09-24 13:13:45 -0400150 """Request that this task cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200151
Victor Stinner8d213572014-06-02 23:06:46 +0200152 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200153 wrapped coroutine on the next cycle through the event loop.
154 The coroutine then has a chance to clean up or even deny
155 the request using try/except/finally.
156
R David Murray8e069d52014-09-24 13:13:45 -0400157 Unlike Future.cancel, this does not guarantee that the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200158 task will be cancelled: the exception might be caught and
R David Murray8e069d52014-09-24 13:13:45 -0400159 acted upon, delaying cancellation of the task or preventing
160 cancellation completely. The task may also return a value or
161 raise a different exception.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200162
163 Immediately after this method is called, Task.cancelled() will
164 not return True (unless the task was already cancelled). A
165 task will be marked as cancelled when the wrapped coroutine
166 terminates with a CancelledError exception (even if cancel()
167 was not called).
168 """
Yury Selivanov7ce1c6f2017-06-11 13:49:18 +0000169 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700170 if self.done():
171 return False
172 if self._fut_waiter is not None:
173 if self._fut_waiter.cancel():
174 # Leave self._fut_waiter; it may be a Task that
175 # catches and ignores the cancellation so we may have
176 # to cancel it again later.
177 return True
178 # It must be the case that self._step is already scheduled.
179 self._must_cancel = True
180 return True
181
Yury Selivanovd59bba82015-11-20 12:41:03 -0500182 def _step(self, exc=None):
Yury Selivanov6370f342017-12-10 18:36:12 -0500183 assert not self.done(), f'_step(): already done: {self!r}, {exc!r}'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700184 if self._must_cancel:
185 if not isinstance(exc, futures.CancelledError):
186 exc = futures.CancelledError()
187 self._must_cancel = False
188 coro = self._coro
189 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800190
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200191 _enter_task(self._loop, self)
Yury Selivanovd59bba82015-11-20 12:41:03 -0500192 # Call either coro.throw(exc) or coro.send(None).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700193 try:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500194 if exc is None:
195 # We use the `send` method directly, because coroutines
196 # don't have `__iter__` and `__next__` methods.
197 result = coro.send(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700198 else:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500199 result = coro.throw(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700200 except StopIteration as exc:
INADA Naoki991adca2017-05-11 21:18:38 +0900201 if self._must_cancel:
202 # Task is cancelled right before coro stops.
203 self._must_cancel = False
204 self.set_exception(futures.CancelledError())
205 else:
206 self.set_result(exc.value)
Yury Selivanov4145c832016-10-09 12:19:12 -0400207 except futures.CancelledError:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700208 super().cancel() # I.e., Future.cancel(self).
209 except Exception as exc:
210 self.set_exception(exc)
211 except BaseException as exc:
212 self.set_exception(exc)
213 raise
214 else:
Guido van Rossum1140a032016-09-09 12:54:54 -0700215 blocking = getattr(result, '_asyncio_future_blocking', None)
216 if blocking is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700217 # Yielded Future must come from Future.__iter__().
Yury Selivanov0ac3a0c2015-12-11 11:33:59 -0500218 if result._loop is not self._loop:
Yury Selivanov6370f342017-12-10 18:36:12 -0500219 new_exc = RuntimeError(
220 f'Task {self!r} got Future '
221 f'{result!r} attached to a different loop')
222 self._loop.call_soon(self._step, new_exc)
Guido van Rossum1140a032016-09-09 12:54:54 -0700223 elif blocking:
Yury Selivanov4145c832016-10-09 12:19:12 -0400224 if result is self:
Yury Selivanov6370f342017-12-10 18:36:12 -0500225 new_exc = RuntimeError(
226 f'Task cannot await on itself: {self!r}')
227 self._loop.call_soon(self._step, new_exc)
Yury Selivanov4145c832016-10-09 12:19:12 -0400228 else:
229 result._asyncio_future_blocking = False
230 result.add_done_callback(self._wakeup)
231 self._fut_waiter = result
232 if self._must_cancel:
233 if self._fut_waiter.cancel():
234 self._must_cancel = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700235 else:
Yury Selivanov6370f342017-12-10 18:36:12 -0500236 new_exc = RuntimeError(
237 f'yield was used instead of yield from '
238 f'in task {self!r} with {result!r}')
239 self._loop.call_soon(self._step, new_exc)
240
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700241 elif result is None:
242 # Bare yield relinquishes control for one event loop iteration.
243 self._loop.call_soon(self._step)
244 elif inspect.isgenerator(result):
245 # Yielding a generator is just wrong.
Yury Selivanov6370f342017-12-10 18:36:12 -0500246 new_exc = RuntimeError(
247 f'yield was used instead of yield from for '
248 f'generator in task {self!r} with {result}')
249 self._loop.call_soon(self._step, new_exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700250 else:
251 # Yielding something else is an error.
Yury Selivanov6370f342017-12-10 18:36:12 -0500252 new_exc = RuntimeError(f'Task got bad yield: {result!r}')
253 self._loop.call_soon(self._step, new_exc)
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800254 finally:
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200255 _leave_task(self._loop, self)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100256 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700257
258 def _wakeup(self, future):
259 try:
Yury Selivanova4afc482015-11-16 15:12:10 -0500260 future.result()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700261 except Exception as exc:
262 # This may also be a cancellation.
Yury Selivanovd59bba82015-11-20 12:41:03 -0500263 self._step(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700264 else:
Yury Selivanova4afc482015-11-16 15:12:10 -0500265 # Don't pass the value of `future.result()` explicitly,
266 # as `Future.__iter__` and `Future.__await__` don't need it.
267 # If we call `_step(value, None)` instead of `_step()`,
268 # Python eval loop would use `.send(value)` method call,
269 # instead of `__next__()`, which is slower for futures
270 # that return non-generator iterators from their `__iter__`.
271 self._step()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700272 self = None # Needed to break cycles when an exception occurs.
273
274
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400275_PyTask = Task
276
277
278try:
279 import _asyncio
280except ImportError:
281 pass
282else:
283 # _CTask is needed for tests.
284 Task = _CTask = _asyncio.Task
285
286
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200287def create_task(coro):
288 """Schedule the execution of a coroutine object in a spawn task.
289
290 Return a Task object.
291 """
292 loop = events.get_running_loop()
293 return loop.create_task(coro)
294
295
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700296# wait() and as_completed() similar to those in PEP 3148.
297
298FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
299FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
300ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
301
302
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200303async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700304 """Wait for the Futures and coroutines given by fs to complete.
305
Victor Stinnerdb74d982014-06-10 11:16:05 +0200306 The sequence futures must not be empty.
307
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700308 Coroutines will be wrapped in Tasks.
309
310 Returns two sets of Future: (done, pending).
311
312 Usage:
313
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200314 done, pending = await asyncio.wait(fs)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700315
316 Note: This does not raise TimeoutError! Futures that aren't done
317 when the timeout occurs are returned in the second set.
318 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700319 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500320 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700321 if not fs:
322 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200323 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
Yury Selivanov6370f342017-12-10 18:36:12 -0500324 raise ValueError(f'Invalid return_when value: {return_when}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700325
326 if loop is None:
327 loop = events.get_event_loop()
328
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400329 fs = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700330
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200331 return await _wait(fs, timeout, return_when, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700332
333
Victor Stinner59e08022014-08-28 11:19:25 +0200334def _release_waiter(waiter, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700335 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200336 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700337
338
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200339async def wait_for(fut, timeout, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700340 """Wait for the single Future or coroutine to complete, with timeout.
341
342 Coroutine will be wrapped in Task.
343
Victor Stinner421e49b2014-01-23 17:40:59 +0100344 Returns result of the Future or coroutine. When a timeout occurs,
345 it cancels the task and raises TimeoutError. To avoid the task
346 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700347
Victor Stinner922bc2c2015-01-15 16:29:10 +0100348 If the wait is cancelled, the task is also cancelled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700349
Victor Stinner922bc2c2015-01-15 16:29:10 +0100350 This function is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700351 """
352 if loop is None:
353 loop = events.get_event_loop()
354
Guido van Rossum48c66c32014-01-29 14:30:38 -0800355 if timeout is None:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200356 return await fut
Guido van Rossum48c66c32014-01-29 14:30:38 -0800357
Victor K4d071892017-10-05 19:04:39 +0300358 if timeout <= 0:
359 fut = ensure_future(fut, loop=loop)
360
361 if fut.done():
362 return fut.result()
363
364 fut.cancel()
365 raise futures.TimeoutError()
366
Yury Selivanov7661db62016-05-16 15:38:39 -0400367 waiter = loop.create_future()
Victor Stinner59e08022014-08-28 11:19:25 +0200368 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
369 cb = functools.partial(_release_waiter, waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700370
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400371 fut = ensure_future(fut, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700372 fut.add_done_callback(cb)
373
374 try:
Victor Stinner59e08022014-08-28 11:19:25 +0200375 # wait until the future completes or the timeout
Victor Stinner922bc2c2015-01-15 16:29:10 +0100376 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200377 await waiter
Victor Stinner922bc2c2015-01-15 16:29:10 +0100378 except futures.CancelledError:
379 fut.remove_done_callback(cb)
380 fut.cancel()
381 raise
Victor Stinner59e08022014-08-28 11:19:25 +0200382
383 if fut.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700384 return fut.result()
385 else:
386 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100387 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700388 raise futures.TimeoutError()
389 finally:
390 timeout_handle.cancel()
391
392
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200393async def _wait(fs, timeout, return_when, loop):
Victor Stinner2ba8ece2016-04-01 21:39:09 +0200394 """Internal helper for wait() and wait_for().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700395
396 The fs argument must be a collection of Futures.
397 """
398 assert fs, 'Set of Futures is empty.'
Yury Selivanov7661db62016-05-16 15:38:39 -0400399 waiter = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700400 timeout_handle = None
401 if timeout is not None:
402 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
403 counter = len(fs)
404
405 def _on_completion(f):
406 nonlocal counter
407 counter -= 1
408 if (counter <= 0 or
409 return_when == FIRST_COMPLETED or
410 return_when == FIRST_EXCEPTION and (not f.cancelled() and
411 f.exception() is not None)):
412 if timeout_handle is not None:
413 timeout_handle.cancel()
414 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200415 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700416
417 for f in fs:
418 f.add_done_callback(_on_completion)
419
420 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200421 await waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700422 finally:
423 if timeout_handle is not None:
424 timeout_handle.cancel()
425
426 done, pending = set(), set()
427 for f in fs:
428 f.remove_done_callback(_on_completion)
429 if f.done():
430 done.add(f)
431 else:
432 pending.add(f)
433 return done, pending
434
435
436# This is *not* a @coroutine! It is just an iterator (yielding Futures).
437def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800438 """Return an iterator whose values are coroutines.
439
440 When waiting for the yielded coroutines you'll get the results (or
441 exceptions!) of the original Futures (or coroutines), in the order
442 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700443
444 This differs from PEP 3148; the proper way to use this is:
445
446 for f in as_completed(fs):
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200447 result = await f # The 'await' may raise.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700448 # Use result.
449
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200450 If a timeout is specified, the 'await' will raise
Guido van Rossumb58f0532014-02-12 17:58:19 -0800451 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700452
453 Note: The futures 'f' are not necessarily members of fs.
454 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700455 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500456 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700457 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400458 todo = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800459 from .queues import Queue # Import here to avoid circular import problem.
460 done = Queue(loop=loop)
461 timeout_handle = None
462
463 def _on_timeout():
464 for f in todo:
465 f.remove_done_callback(_on_completion)
466 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
467 todo.clear() # Can't do todo.remove(f) in the loop.
468
469 def _on_completion(f):
470 if not todo:
471 return # _on_timeout() was here first.
472 todo.remove(f)
473 done.put_nowait(f)
474 if not todo and timeout_handle is not None:
475 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700476
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200477 async def _wait_for_one():
478 f = await done.get()
Guido van Rossumb58f0532014-02-12 17:58:19 -0800479 if f is None:
480 # Dummy value from _on_timeout().
481 raise futures.TimeoutError
482 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700483
Guido van Rossumb58f0532014-02-12 17:58:19 -0800484 for f in todo:
485 f.add_done_callback(_on_completion)
486 if todo and timeout is not None:
487 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700488 for _ in range(len(todo)):
489 yield _wait_for_one()
490
491
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200492@types.coroutine
493def __sleep0():
494 """Skip one event loop run cycle.
495
496 This is a private helper for 'asyncio.sleep()', used
497 when the 'delay' is set to 0. It uses a bare 'yield'
498 expression (which Task._step knows how to handle)
499 instead of creating a Future object.
500 """
501 yield
502
503
504async def sleep(delay, result=None, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700505 """Coroutine that completes after a given time (in seconds)."""
Andrew Svetlov5382c052017-12-17 16:41:30 +0200506 if delay <= 0:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200507 await __sleep0()
Yury Selivanovade04122015-11-05 14:29:04 -0500508 return result
509
Yury Selivanov7661db62016-05-16 15:38:39 -0400510 if loop is None:
511 loop = events.get_event_loop()
512 future = loop.create_future()
Victor Stinnera9acbe82014-07-05 15:29:41 +0200513 h = future._loop.call_later(delay,
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500514 futures._set_result_unless_cancelled,
515 future, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700516 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200517 return await future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700518 finally:
519 h.cancel()
520
521
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400522def ensure_future(coro_or_future, *, loop=None):
Yury Selivanov620279b2015-10-02 15:00:19 -0400523 """Wrap a coroutine or an awaitable in a future.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400524
525 If the argument is a Future, it is returned directly.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700526 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700527 if futures.isfuture(coro_or_future):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700528 if loop is not None and loop is not coro_or_future._loop:
529 raise ValueError('loop argument must agree with Future')
530 return coro_or_future
Victor Stinnerf951d282014-06-29 00:46:45 +0200531 elif coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200532 if loop is None:
533 loop = events.get_event_loop()
534 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200535 if task._source_traceback:
536 del task._source_traceback[-1]
537 return task
Victor Stinner3f438a92017-11-28 14:43:52 +0100538 elif inspect.isawaitable(coro_or_future):
Yury Selivanov620279b2015-10-02 15:00:19 -0400539 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700540 else:
Charles Renwickae5b3262017-04-21 16:49:48 -0400541 raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
542 'required')
Yury Selivanov620279b2015-10-02 15:00:19 -0400543
544
545@coroutine
546def _wrap_awaitable(awaitable):
547 """Helper for asyncio.ensure_future().
548
549 Wraps awaitable (an object with __await__) into a coroutine
550 that will later be wrapped in a Task by ensure_future().
551 """
552 return (yield from awaitable.__await__())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700553
554
555class _GatheringFuture(futures.Future):
556 """Helper for gather().
557
558 This overrides cancel() to cancel all the children and act more
559 like Task.cancel(), which doesn't immediately mark itself as
560 cancelled.
561 """
562
563 def __init__(self, children, *, loop=None):
564 super().__init__(loop=loop)
565 self._children = children
566
567 def cancel(self):
568 if self.done():
569 return False
Yury Selivanov3d676152016-10-21 17:22:17 -0400570 ret = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700571 for child in self._children:
Yury Selivanov3d676152016-10-21 17:22:17 -0400572 if child.cancel():
573 ret = True
574 return ret
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700575
576
577def gather(*coros_or_futures, loop=None, return_exceptions=False):
578 """Return a future aggregating results from the given coroutines
579 or futures.
580
Guido van Rossume3c65a72016-09-30 08:17:15 -0700581 Coroutines will be wrapped in a future and scheduled in the event
582 loop. They will not necessarily be scheduled in the same order as
583 passed in.
584
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700585 All futures must share the same event loop. If all the tasks are
586 done successfully, the returned future's result is the list of
587 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500588 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700589 exceptions in the tasks are treated the same as successful
590 results, and gathered in the result list; otherwise, the first
591 raised exception will be immediately propagated to the returned
592 future.
593
594 Cancellation: if the outer Future is cancelled, all children (that
595 have not completed yet) are also cancelled. If any child is
596 cancelled, this is treated as if it raised CancelledError --
597 the outer Future is *not* cancelled in this case. (This is to
598 prevent the cancellation of one child to cause other children to
599 be cancelled.)
600 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200601 if not coros_or_futures:
Yury Selivanov7661db62016-05-16 15:38:39 -0400602 if loop is None:
603 loop = events.get_event_loop()
604 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700605 outer.set_result([])
606 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200607
608 arg_to_fut = {}
609 for arg in set(coros_or_futures):
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700610 if not futures.isfuture(arg):
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400611 fut = ensure_future(arg, loop=loop)
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200612 if loop is None:
613 loop = fut._loop
614 # The caller cannot control this future, the "destroy pending task"
615 # warning should not be emitted.
616 fut._log_destroy_pending = False
617 else:
618 fut = arg
619 if loop is None:
620 loop = fut._loop
621 elif fut._loop is not loop:
622 raise ValueError("futures are tied to different event loops")
623 arg_to_fut[arg] = fut
624
625 children = [arg_to_fut[arg] for arg in coros_or_futures]
626 nchildren = len(children)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700627 outer = _GatheringFuture(children, loop=loop)
628 nfinished = 0
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200629 results = [None] * nchildren
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700630
631 def _done_callback(i, fut):
632 nonlocal nfinished
Victor Stinner3531d902015-01-09 01:42:52 +0100633 if outer.done():
634 if not fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700635 # Mark exception retrieved.
636 fut.exception()
637 return
Victor Stinner3531d902015-01-09 01:42:52 +0100638
Victor Stinner29342622015-01-29 14:15:19 +0100639 if fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700640 res = futures.CancelledError()
641 if not return_exceptions:
642 outer.set_exception(res)
643 return
644 elif fut._exception is not None:
645 res = fut.exception() # Mark exception retrieved.
646 if not return_exceptions:
647 outer.set_exception(res)
648 return
649 else:
650 res = fut._result
651 results[i] = res
652 nfinished += 1
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200653 if nfinished == nchildren:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700654 outer.set_result(results)
655
656 for i, fut in enumerate(children):
657 fut.add_done_callback(functools.partial(_done_callback, i))
658 return outer
659
660
661def shield(arg, *, loop=None):
662 """Wait for a future, shielding it from cancellation.
663
664 The statement
665
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200666 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700667
668 is exactly equivalent to the statement
669
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200670 res = await something()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700671
672 *except* that if the coroutine containing it is cancelled, the
673 task running in something() is not cancelled. From the POV of
674 something(), the cancellation did not happen. But its caller is
675 still cancelled, so the yield-from expression still raises
676 CancelledError. Note: If something() is cancelled by other means
677 this will still cancel shield().
678
679 If you want to completely ignore cancellation (not recommended)
680 you can combine shield() with a try/except clause, as follows:
681
682 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200683 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700684 except CancelledError:
685 res = None
686 """
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400687 inner = ensure_future(arg, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700688 if inner.done():
689 # Shortcut.
690 return inner
691 loop = inner._loop
Yury Selivanov7661db62016-05-16 15:38:39 -0400692 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700693
694 def _done_callback(inner):
695 if outer.cancelled():
Victor Stinner3531d902015-01-09 01:42:52 +0100696 if not inner.cancelled():
697 # Mark inner's result as retrieved.
698 inner.exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700699 return
Victor Stinner3531d902015-01-09 01:42:52 +0100700
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700701 if inner.cancelled():
702 outer.cancel()
703 else:
704 exc = inner.exception()
705 if exc is not None:
706 outer.set_exception(exc)
707 else:
708 outer.set_result(inner.result())
709
710 inner.add_done_callback(_done_callback)
711 return outer
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700712
713
714def run_coroutine_threadsafe(coro, loop):
715 """Submit a coroutine object to a given event loop.
716
717 Return a concurrent.futures.Future to access the result.
718 """
719 if not coroutines.iscoroutine(coro):
720 raise TypeError('A coroutine object is required')
721 future = concurrent.futures.Future()
722
723 def callback():
Guido van Rossum601953b2015-10-05 16:20:00 -0700724 try:
725 futures._chain_future(ensure_future(coro, loop=loop), future)
726 except Exception as exc:
727 if future.set_running_or_notify_cancel():
728 future.set_exception(exc)
729 raise
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700730
731 loop.call_soon_threadsafe(callback)
732 return future
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200733
734
735# WeakKeyDictionary of {Task: EventLoop} containing all tasks alive.
736# Task should be a weak reference to remove entry on task garbage
737# collection, EventLoop is required
738# to not access to private task._loop attribute.
739_all_tasks = weakref.WeakKeyDictionary()
740
741# Dictionary containing tasks that are currently active in
742# all running event loops. {EventLoop: Task}
743_current_tasks = {}
744
745
746def _register_task(loop, task):
747 """Register a new task in asyncio as executed by loop.
748
749 Returns None.
750 """
751 _all_tasks[task] = loop
752
753
754def _enter_task(loop, task):
755 current_task = _current_tasks.get(loop)
756 if current_task is not None:
757 raise RuntimeError(f"Cannot enter into task {task!r} while another "
758 f"task {current_task!r} is being executed.")
759 _current_tasks[loop] = task
760
761
762def _leave_task(loop, task):
763 current_task = _current_tasks.get(loop)
764 if current_task is not task:
765 raise RuntimeError(f"Leaving task {task!r} does not match "
766 f"the current task {current_task!r}.")
767 del _current_tasks[loop]
768
769
770def _unregister_task(loop, task):
771 _all_tasks.pop(task, None)
772
773
774_py_register_task = _register_task
775_py_unregister_task = _unregister_task
776_py_enter_task = _enter_task
777_py_leave_task = _leave_task
778
779
780try:
781 from _asyncio import (_register_task, _unregister_task,
782 _enter_task, _leave_task,
783 _all_tasks, _current_tasks)
784except ImportError:
785 pass
786else:
787 _c_register_task = _register_task
788 _c_unregister_task = _unregister_task
789 _c_enter_task = _enter_task
790 _c_leave_task = _leave_task