blob: 572e7073338e79041cd763d346548bfcf37958fb [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Yury Selivanov6370f342017-12-10 18:36:12 -05003__all__ = (
Andrew Svetlovf74ef452017-12-15 07:04:38 +02004 'Task', 'create_task',
Yury Selivanov6370f342017-12-10 18:36:12 -05005 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
Yury Selivanov9edad3c2017-12-11 10:03:48 -05006 'wait', 'wait_for', 'as_completed', 'sleep',
Yury Selivanov6370f342017-12-10 18:36:12 -05007 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
Andrew Svetlov44d1a592017-12-16 21:58:38 +02008 'current_task', 'all_tasks',
9 '_register_task', '_unregister_task', '_enter_task', '_leave_task',
Yury Selivanov6370f342017-12-10 18:36:12 -050010)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012import concurrent.futures
13import functools
14import inspect
Andrew Svetlov5f841b52017-12-09 00:23:48 +020015import types
Yury Selivanov59eb9a42015-05-11 14:48:38 -040016import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017import weakref
18
Yury Selivanova0c1ba62016-10-28 12:52:37 -040019from . import base_tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020020from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021from . import events
22from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020023from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070024
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025
Andrew Svetlov44d1a592017-12-16 21:58:38 +020026def current_task(loop=None):
27 """Return a currently executed task."""
28 if loop is None:
29 loop = events.get_running_loop()
30 return _current_tasks.get(loop)
31
32
33def all_tasks(loop=None):
34 """Return a set of all tasks for the loop."""
35 if loop is None:
36 loop = events.get_event_loop()
Yury Selivanovca9b36c2017-12-23 15:04:15 -050037 return {t for t in _all_tasks if futures._get_loop(t) is loop}
Andrew Svetlov44d1a592017-12-16 21:58:38 +020038
39
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070040class Task(futures.Future):
41 """A coroutine wrapped in a Future."""
42
43 # An important invariant maintained while a Task not done:
44 #
45 # - Either _fut_waiter is None, and _step() is scheduled;
46 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
47 #
48 # The only transition from the latter to the former is through
49 # _wakeup(). When _fut_waiter is not None, one of its callbacks
50 # must be _wakeup().
51
Victor Stinnerfe22e092014-12-04 23:00:13 +010052 # If False, don't log a message if the task is destroyed whereas its
53 # status is still pending
54 _log_destroy_pending = True
55
Guido van Rossum1a605ed2013-12-06 12:57:40 -080056 @classmethod
57 def current_task(cls, loop=None):
58 """Return the currently running task in an event loop or None.
59
60 By default the current task for the current event loop is returned.
61
62 None is returned when called not in the context of a Task.
63 """
Andrew Svetlov44d1a592017-12-16 21:58:38 +020064 warnings.warn("Task.current_task() is deprecated, "
65 "use asyncio.current_task() instead",
66 PendingDeprecationWarning,
67 stacklevel=2)
Guido van Rossum1a605ed2013-12-06 12:57:40 -080068 if loop is None:
69 loop = events.get_event_loop()
Andrew Svetlov44d1a592017-12-16 21:58:38 +020070 return current_task(loop)
Guido van Rossum1a605ed2013-12-06 12:57:40 -080071
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070072 @classmethod
73 def all_tasks(cls, loop=None):
74 """Return a set of all tasks for an event loop.
75
76 By default all tasks for the current event loop are returned.
77 """
Andrew Svetlov44d1a592017-12-16 21:58:38 +020078 warnings.warn("Task.all_tasks() is deprecated, "
79 "use asyncio.all_tasks() instead",
80 PendingDeprecationWarning,
81 stacklevel=2)
82 return all_tasks(loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070083
84 def __init__(self, coro, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070085 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +020086 if self._source_traceback:
87 del self._source_traceback[-1]
Andrew Svetlovf74ef452017-12-15 07:04:38 +020088 if not coroutines.iscoroutine(coro):
89 # raise after Future.__init__(), attrs are required for __del__
90 # prevent logging for pending task in __del__
91 self._log_destroy_pending = False
92 raise TypeError(f"a coroutine was expected, got {coro!r}")
93
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070094 self._must_cancel = False
Andrew Svetlovf74ef452017-12-15 07:04:38 +020095 self._fut_waiter = None
96 self._coro = coro
97
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070098 self._loop.call_soon(self._step)
Yury Selivanovca9b36c2017-12-23 15:04:15 -050099 _register_task(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700100
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900101 def __del__(self):
102 if self._state == futures._PENDING and self._log_destroy_pending:
103 context = {
104 'task': self,
105 'message': 'Task was destroyed but it is pending!',
106 }
107 if self._source_traceback:
108 context['source_traceback'] = self._source_traceback
109 self._loop.call_exception_handler(context)
110 futures.Future.__del__(self)
Victor Stinnera02f81f2014-06-24 22:37:53 +0200111
Victor Stinner313a9802014-07-29 12:58:23 +0200112 def _repr_info(self):
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400113 return base_tasks._task_repr_info(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700114
115 def get_stack(self, *, limit=None):
116 """Return the list of stack frames for this task's coroutine.
117
Victor Stinnerd87de832014-12-02 17:57:04 +0100118 If the coroutine is not done, this returns the stack where it is
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700119 suspended. If the coroutine has completed successfully or was
120 cancelled, this returns an empty list. If the coroutine was
121 terminated by an exception, this returns the list of traceback
122 frames.
123
124 The frames are always ordered from oldest to newest.
125
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500126 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700127 return; by default all available frames are returned. Its
128 meaning differs depending on whether a stack or a traceback is
129 returned: the newest frames of a stack are returned, but the
130 oldest frames of a traceback are returned. (This matches the
131 behavior of the traceback module.)
132
133 For reasons beyond our control, only one stack frame is
134 returned for a suspended coroutine.
135 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400136 return base_tasks._task_get_stack(self, limit)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700137
138 def print_stack(self, *, limit=None, file=None):
139 """Print the stack or traceback for this task's coroutine.
140
141 This produces output similar to that of the traceback module,
142 for the frames retrieved by get_stack(). The limit argument
143 is passed to get_stack(). The file argument is an I/O stream
R David Murray8e069d52014-09-24 13:13:45 -0400144 to which the output is written; by default output is written
145 to sys.stderr.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700146 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400147 return base_tasks._task_print_stack(self, limit, file)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700148
149 def cancel(self):
R David Murray8e069d52014-09-24 13:13:45 -0400150 """Request that this task cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200151
Victor Stinner8d213572014-06-02 23:06:46 +0200152 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200153 wrapped coroutine on the next cycle through the event loop.
154 The coroutine then has a chance to clean up or even deny
155 the request using try/except/finally.
156
R David Murray8e069d52014-09-24 13:13:45 -0400157 Unlike Future.cancel, this does not guarantee that the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200158 task will be cancelled: the exception might be caught and
R David Murray8e069d52014-09-24 13:13:45 -0400159 acted upon, delaying cancellation of the task or preventing
160 cancellation completely. The task may also return a value or
161 raise a different exception.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200162
163 Immediately after this method is called, Task.cancelled() will
164 not return True (unless the task was already cancelled). A
165 task will be marked as cancelled when the wrapped coroutine
166 terminates with a CancelledError exception (even if cancel()
167 was not called).
168 """
Yury Selivanov7ce1c6f2017-06-11 13:49:18 +0000169 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700170 if self.done():
171 return False
172 if self._fut_waiter is not None:
173 if self._fut_waiter.cancel():
174 # Leave self._fut_waiter; it may be a Task that
175 # catches and ignores the cancellation so we may have
176 # to cancel it again later.
177 return True
178 # It must be the case that self._step is already scheduled.
179 self._must_cancel = True
180 return True
181
Yury Selivanovd59bba82015-11-20 12:41:03 -0500182 def _step(self, exc=None):
Yury Selivanov6370f342017-12-10 18:36:12 -0500183 assert not self.done(), f'_step(): already done: {self!r}, {exc!r}'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700184 if self._must_cancel:
185 if not isinstance(exc, futures.CancelledError):
186 exc = futures.CancelledError()
187 self._must_cancel = False
188 coro = self._coro
189 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800190
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200191 _enter_task(self._loop, self)
Yury Selivanovd59bba82015-11-20 12:41:03 -0500192 # Call either coro.throw(exc) or coro.send(None).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700193 try:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500194 if exc is None:
195 # We use the `send` method directly, because coroutines
196 # don't have `__iter__` and `__next__` methods.
197 result = coro.send(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700198 else:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500199 result = coro.throw(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700200 except StopIteration as exc:
INADA Naoki991adca2017-05-11 21:18:38 +0900201 if self._must_cancel:
202 # Task is cancelled right before coro stops.
203 self._must_cancel = False
204 self.set_exception(futures.CancelledError())
205 else:
206 self.set_result(exc.value)
Yury Selivanov4145c832016-10-09 12:19:12 -0400207 except futures.CancelledError:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700208 super().cancel() # I.e., Future.cancel(self).
209 except Exception as exc:
210 self.set_exception(exc)
211 except BaseException as exc:
212 self.set_exception(exc)
213 raise
214 else:
Guido van Rossum1140a032016-09-09 12:54:54 -0700215 blocking = getattr(result, '_asyncio_future_blocking', None)
216 if blocking is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700217 # Yielded Future must come from Future.__iter__().
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500218 if futures._get_loop(result) is not self._loop:
Yury Selivanov6370f342017-12-10 18:36:12 -0500219 new_exc = RuntimeError(
220 f'Task {self!r} got Future '
221 f'{result!r} attached to a different loop')
222 self._loop.call_soon(self._step, new_exc)
Guido van Rossum1140a032016-09-09 12:54:54 -0700223 elif blocking:
Yury Selivanov4145c832016-10-09 12:19:12 -0400224 if result is self:
Yury Selivanov6370f342017-12-10 18:36:12 -0500225 new_exc = RuntimeError(
226 f'Task cannot await on itself: {self!r}')
227 self._loop.call_soon(self._step, new_exc)
Yury Selivanov4145c832016-10-09 12:19:12 -0400228 else:
229 result._asyncio_future_blocking = False
230 result.add_done_callback(self._wakeup)
231 self._fut_waiter = result
232 if self._must_cancel:
233 if self._fut_waiter.cancel():
234 self._must_cancel = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700235 else:
Yury Selivanov6370f342017-12-10 18:36:12 -0500236 new_exc = RuntimeError(
237 f'yield was used instead of yield from '
238 f'in task {self!r} with {result!r}')
239 self._loop.call_soon(self._step, new_exc)
240
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700241 elif result is None:
242 # Bare yield relinquishes control for one event loop iteration.
243 self._loop.call_soon(self._step)
244 elif inspect.isgenerator(result):
245 # Yielding a generator is just wrong.
Yury Selivanov6370f342017-12-10 18:36:12 -0500246 new_exc = RuntimeError(
247 f'yield was used instead of yield from for '
248 f'generator in task {self!r} with {result}')
249 self._loop.call_soon(self._step, new_exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700250 else:
251 # Yielding something else is an error.
Yury Selivanov6370f342017-12-10 18:36:12 -0500252 new_exc = RuntimeError(f'Task got bad yield: {result!r}')
253 self._loop.call_soon(self._step, new_exc)
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800254 finally:
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200255 _leave_task(self._loop, self)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100256 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700257
258 def _wakeup(self, future):
259 try:
Yury Selivanova4afc482015-11-16 15:12:10 -0500260 future.result()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700261 except Exception as exc:
262 # This may also be a cancellation.
Yury Selivanovd59bba82015-11-20 12:41:03 -0500263 self._step(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700264 else:
Yury Selivanova4afc482015-11-16 15:12:10 -0500265 # Don't pass the value of `future.result()` explicitly,
266 # as `Future.__iter__` and `Future.__await__` don't need it.
267 # If we call `_step(value, None)` instead of `_step()`,
268 # Python eval loop would use `.send(value)` method call,
269 # instead of `__next__()`, which is slower for futures
270 # that return non-generator iterators from their `__iter__`.
271 self._step()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700272 self = None # Needed to break cycles when an exception occurs.
273
274
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400275_PyTask = Task
276
277
278try:
279 import _asyncio
280except ImportError:
281 pass
282else:
283 # _CTask is needed for tests.
284 Task = _CTask = _asyncio.Task
285
286
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200287def create_task(coro):
288 """Schedule the execution of a coroutine object in a spawn task.
289
290 Return a Task object.
291 """
292 loop = events.get_running_loop()
293 return loop.create_task(coro)
294
295
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700296# wait() and as_completed() similar to those in PEP 3148.
297
298FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
299FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
300ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
301
302
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200303async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700304 """Wait for the Futures and coroutines given by fs to complete.
305
Victor Stinnerdb74d982014-06-10 11:16:05 +0200306 The sequence futures must not be empty.
307
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700308 Coroutines will be wrapped in Tasks.
309
310 Returns two sets of Future: (done, pending).
311
312 Usage:
313
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200314 done, pending = await asyncio.wait(fs)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700315
316 Note: This does not raise TimeoutError! Futures that aren't done
317 when the timeout occurs are returned in the second set.
318 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700319 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500320 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700321 if not fs:
322 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200323 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
Yury Selivanov6370f342017-12-10 18:36:12 -0500324 raise ValueError(f'Invalid return_when value: {return_when}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700325
326 if loop is None:
327 loop = events.get_event_loop()
328
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400329 fs = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700330
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200331 return await _wait(fs, timeout, return_when, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700332
333
Victor Stinner59e08022014-08-28 11:19:25 +0200334def _release_waiter(waiter, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700335 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200336 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700337
338
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200339async def wait_for(fut, timeout, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700340 """Wait for the single Future or coroutine to complete, with timeout.
341
342 Coroutine will be wrapped in Task.
343
Victor Stinner421e49b2014-01-23 17:40:59 +0100344 Returns result of the Future or coroutine. When a timeout occurs,
345 it cancels the task and raises TimeoutError. To avoid the task
346 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700347
Victor Stinner922bc2c2015-01-15 16:29:10 +0100348 If the wait is cancelled, the task is also cancelled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700349
Victor Stinner922bc2c2015-01-15 16:29:10 +0100350 This function is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700351 """
352 if loop is None:
353 loop = events.get_event_loop()
354
Guido van Rossum48c66c32014-01-29 14:30:38 -0800355 if timeout is None:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200356 return await fut
Guido van Rossum48c66c32014-01-29 14:30:38 -0800357
Victor K4d071892017-10-05 19:04:39 +0300358 if timeout <= 0:
359 fut = ensure_future(fut, loop=loop)
360
361 if fut.done():
362 return fut.result()
363
364 fut.cancel()
365 raise futures.TimeoutError()
366
Yury Selivanov7661db62016-05-16 15:38:39 -0400367 waiter = loop.create_future()
Victor Stinner59e08022014-08-28 11:19:25 +0200368 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
369 cb = functools.partial(_release_waiter, waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700370
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400371 fut = ensure_future(fut, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700372 fut.add_done_callback(cb)
373
374 try:
Victor Stinner59e08022014-08-28 11:19:25 +0200375 # wait until the future completes or the timeout
Victor Stinner922bc2c2015-01-15 16:29:10 +0100376 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200377 await waiter
Victor Stinner922bc2c2015-01-15 16:29:10 +0100378 except futures.CancelledError:
379 fut.remove_done_callback(cb)
380 fut.cancel()
381 raise
Victor Stinner59e08022014-08-28 11:19:25 +0200382
383 if fut.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700384 return fut.result()
385 else:
386 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100387 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700388 raise futures.TimeoutError()
389 finally:
390 timeout_handle.cancel()
391
392
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200393async def _wait(fs, timeout, return_when, loop):
Victor Stinner2ba8ece2016-04-01 21:39:09 +0200394 """Internal helper for wait() and wait_for().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700395
396 The fs argument must be a collection of Futures.
397 """
398 assert fs, 'Set of Futures is empty.'
Yury Selivanov7661db62016-05-16 15:38:39 -0400399 waiter = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700400 timeout_handle = None
401 if timeout is not None:
402 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
403 counter = len(fs)
404
405 def _on_completion(f):
406 nonlocal counter
407 counter -= 1
408 if (counter <= 0 or
409 return_when == FIRST_COMPLETED or
410 return_when == FIRST_EXCEPTION and (not f.cancelled() and
411 f.exception() is not None)):
412 if timeout_handle is not None:
413 timeout_handle.cancel()
414 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200415 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700416
417 for f in fs:
418 f.add_done_callback(_on_completion)
419
420 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200421 await waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700422 finally:
423 if timeout_handle is not None:
424 timeout_handle.cancel()
425
426 done, pending = set(), set()
427 for f in fs:
428 f.remove_done_callback(_on_completion)
429 if f.done():
430 done.add(f)
431 else:
432 pending.add(f)
433 return done, pending
434
435
436# This is *not* a @coroutine! It is just an iterator (yielding Futures).
437def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800438 """Return an iterator whose values are coroutines.
439
440 When waiting for the yielded coroutines you'll get the results (or
441 exceptions!) of the original Futures (or coroutines), in the order
442 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700443
444 This differs from PEP 3148; the proper way to use this is:
445
446 for f in as_completed(fs):
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200447 result = await f # The 'await' may raise.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700448 # Use result.
449
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200450 If a timeout is specified, the 'await' will raise
Guido van Rossumb58f0532014-02-12 17:58:19 -0800451 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700452
453 Note: The futures 'f' are not necessarily members of fs.
454 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700455 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500456 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700457 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400458 todo = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800459 from .queues import Queue # Import here to avoid circular import problem.
460 done = Queue(loop=loop)
461 timeout_handle = None
462
463 def _on_timeout():
464 for f in todo:
465 f.remove_done_callback(_on_completion)
466 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
467 todo.clear() # Can't do todo.remove(f) in the loop.
468
469 def _on_completion(f):
470 if not todo:
471 return # _on_timeout() was here first.
472 todo.remove(f)
473 done.put_nowait(f)
474 if not todo and timeout_handle is not None:
475 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700476
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200477 async def _wait_for_one():
478 f = await done.get()
Guido van Rossumb58f0532014-02-12 17:58:19 -0800479 if f is None:
480 # Dummy value from _on_timeout().
481 raise futures.TimeoutError
482 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700483
Guido van Rossumb58f0532014-02-12 17:58:19 -0800484 for f in todo:
485 f.add_done_callback(_on_completion)
486 if todo and timeout is not None:
487 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700488 for _ in range(len(todo)):
489 yield _wait_for_one()
490
491
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200492@types.coroutine
493def __sleep0():
494 """Skip one event loop run cycle.
495
496 This is a private helper for 'asyncio.sleep()', used
497 when the 'delay' is set to 0. It uses a bare 'yield'
498 expression (which Task._step knows how to handle)
499 instead of creating a Future object.
500 """
501 yield
502
503
504async def sleep(delay, result=None, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700505 """Coroutine that completes after a given time (in seconds)."""
Andrew Svetlov5382c052017-12-17 16:41:30 +0200506 if delay <= 0:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200507 await __sleep0()
Yury Selivanovade04122015-11-05 14:29:04 -0500508 return result
509
Yury Selivanov7661db62016-05-16 15:38:39 -0400510 if loop is None:
511 loop = events.get_event_loop()
512 future = loop.create_future()
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500513 h = loop.call_later(delay,
514 futures._set_result_unless_cancelled,
515 future, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700516 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200517 return await future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700518 finally:
519 h.cancel()
520
521
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400522def ensure_future(coro_or_future, *, loop=None):
Yury Selivanov620279b2015-10-02 15:00:19 -0400523 """Wrap a coroutine or an awaitable in a future.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400524
525 If the argument is a Future, it is returned directly.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700526 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700527 if futures.isfuture(coro_or_future):
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500528 if loop is not None and loop is not futures._get_loop(coro_or_future):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700529 raise ValueError('loop argument must agree with Future')
530 return coro_or_future
Victor Stinnerf951d282014-06-29 00:46:45 +0200531 elif coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200532 if loop is None:
533 loop = events.get_event_loop()
534 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200535 if task._source_traceback:
536 del task._source_traceback[-1]
537 return task
Victor Stinner3f438a92017-11-28 14:43:52 +0100538 elif inspect.isawaitable(coro_or_future):
Yury Selivanov620279b2015-10-02 15:00:19 -0400539 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700540 else:
Charles Renwickae5b3262017-04-21 16:49:48 -0400541 raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
542 'required')
Yury Selivanov620279b2015-10-02 15:00:19 -0400543
544
545@coroutine
546def _wrap_awaitable(awaitable):
547 """Helper for asyncio.ensure_future().
548
549 Wraps awaitable (an object with __await__) into a coroutine
550 that will later be wrapped in a Task by ensure_future().
551 """
552 return (yield from awaitable.__await__())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700553
554
555class _GatheringFuture(futures.Future):
556 """Helper for gather().
557
558 This overrides cancel() to cancel all the children and act more
559 like Task.cancel(), which doesn't immediately mark itself as
560 cancelled.
561 """
562
563 def __init__(self, children, *, loop=None):
564 super().__init__(loop=loop)
565 self._children = children
566
567 def cancel(self):
568 if self.done():
569 return False
Yury Selivanov3d676152016-10-21 17:22:17 -0400570 ret = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700571 for child in self._children:
Yury Selivanov3d676152016-10-21 17:22:17 -0400572 if child.cancel():
573 ret = True
574 return ret
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700575
576
577def gather(*coros_or_futures, loop=None, return_exceptions=False):
Yury Selivanov36c2c042017-12-19 07:19:53 -0500578 """Return a future aggregating results from the given coroutines/futures.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700579
Guido van Rossume3c65a72016-09-30 08:17:15 -0700580 Coroutines will be wrapped in a future and scheduled in the event
581 loop. They will not necessarily be scheduled in the same order as
582 passed in.
583
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700584 All futures must share the same event loop. If all the tasks are
585 done successfully, the returned future's result is the list of
586 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500587 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700588 exceptions in the tasks are treated the same as successful
589 results, and gathered in the result list; otherwise, the first
590 raised exception will be immediately propagated to the returned
591 future.
592
593 Cancellation: if the outer Future is cancelled, all children (that
594 have not completed yet) are also cancelled. If any child is
595 cancelled, this is treated as if it raised CancelledError --
596 the outer Future is *not* cancelled in this case. (This is to
597 prevent the cancellation of one child to cause other children to
598 be cancelled.)
599 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200600 if not coros_or_futures:
Yury Selivanov7661db62016-05-16 15:38:39 -0400601 if loop is None:
602 loop = events.get_event_loop()
603 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700604 outer.set_result([])
605 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200606
Yury Selivanov36c2c042017-12-19 07:19:53 -0500607 def _done_callback(fut):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700608 nonlocal nfinished
Yury Selivanov36c2c042017-12-19 07:19:53 -0500609 nfinished += 1
610
Victor Stinner3531d902015-01-09 01:42:52 +0100611 if outer.done():
612 if not fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700613 # Mark exception retrieved.
614 fut.exception()
615 return
Victor Stinner3531d902015-01-09 01:42:52 +0100616
Yury Selivanov36c2c042017-12-19 07:19:53 -0500617 if not return_exceptions:
618 if fut.cancelled():
619 # Check if 'fut' is cancelled first, as
620 # 'fut.exception()' will *raise* a CancelledError
621 # instead of returning it.
622 exc = futures.CancelledError()
623 outer.set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700624 return
Yury Selivanov36c2c042017-12-19 07:19:53 -0500625 else:
626 exc = fut.exception()
627 if exc is not None:
628 outer.set_exception(exc)
629 return
630
631 if nfinished == nfuts:
632 # All futures are done; create a list of results
633 # and set it to the 'outer' future.
634 results = []
635
636 for fut in children:
637 if fut.cancelled():
638 # Check if 'fut' is cancelled first, as
639 # 'fut.exception()' will *raise* a CancelledError
640 # instead of returning it.
641 res = futures.CancelledError()
642 else:
643 res = fut.exception()
644 if res is None:
645 res = fut.result()
646 results.append(res)
647
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700648 outer.set_result(results)
649
Yury Selivanov36c2c042017-12-19 07:19:53 -0500650 arg_to_fut = {}
651 children = []
652 nfuts = 0
653 nfinished = 0
654 for arg in coros_or_futures:
655 if arg not in arg_to_fut:
656 fut = ensure_future(arg, loop=loop)
657 if loop is None:
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500658 loop = futures._get_loop(fut)
Yury Selivanov36c2c042017-12-19 07:19:53 -0500659 if fut is not arg:
660 # 'arg' was not a Future, therefore, 'fut' is a new
661 # Future created specifically for 'arg'. Since the caller
662 # can't control it, disable the "destroy pending task"
663 # warning.
664 fut._log_destroy_pending = False
665
666 nfuts += 1
667 arg_to_fut[arg] = fut
668 fut.add_done_callback(_done_callback)
669
670 else:
671 # There's a duplicate Future object in coros_or_futures.
672 fut = arg_to_fut[arg]
673
674 children.append(fut)
675
676 outer = _GatheringFuture(children, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700677 return outer
678
679
680def shield(arg, *, loop=None):
681 """Wait for a future, shielding it from cancellation.
682
683 The statement
684
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200685 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700686
687 is exactly equivalent to the statement
688
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200689 res = await something()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700690
691 *except* that if the coroutine containing it is cancelled, the
692 task running in something() is not cancelled. From the POV of
693 something(), the cancellation did not happen. But its caller is
694 still cancelled, so the yield-from expression still raises
695 CancelledError. Note: If something() is cancelled by other means
696 this will still cancel shield().
697
698 If you want to completely ignore cancellation (not recommended)
699 you can combine shield() with a try/except clause, as follows:
700
701 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200702 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700703 except CancelledError:
704 res = None
705 """
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400706 inner = ensure_future(arg, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700707 if inner.done():
708 # Shortcut.
709 return inner
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500710 loop = futures._get_loop(inner)
Yury Selivanov7661db62016-05-16 15:38:39 -0400711 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700712
713 def _done_callback(inner):
714 if outer.cancelled():
Victor Stinner3531d902015-01-09 01:42:52 +0100715 if not inner.cancelled():
716 # Mark inner's result as retrieved.
717 inner.exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700718 return
Victor Stinner3531d902015-01-09 01:42:52 +0100719
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700720 if inner.cancelled():
721 outer.cancel()
722 else:
723 exc = inner.exception()
724 if exc is not None:
725 outer.set_exception(exc)
726 else:
727 outer.set_result(inner.result())
728
729 inner.add_done_callback(_done_callback)
730 return outer
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700731
732
733def run_coroutine_threadsafe(coro, loop):
734 """Submit a coroutine object to a given event loop.
735
736 Return a concurrent.futures.Future to access the result.
737 """
738 if not coroutines.iscoroutine(coro):
739 raise TypeError('A coroutine object is required')
740 future = concurrent.futures.Future()
741
742 def callback():
Guido van Rossum601953b2015-10-05 16:20:00 -0700743 try:
744 futures._chain_future(ensure_future(coro, loop=loop), future)
745 except Exception as exc:
746 if future.set_running_or_notify_cancel():
747 future.set_exception(exc)
748 raise
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700749
750 loop.call_soon_threadsafe(callback)
751 return future
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200752
753
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500754# WeakSet containing all alive tasks.
755_all_tasks = weakref.WeakSet()
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200756
757# Dictionary containing tasks that are currently active in
758# all running event loops. {EventLoop: Task}
759_current_tasks = {}
760
761
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500762def _register_task(task):
763 """Register a new task in asyncio as executed by loop."""
764 _all_tasks.add(task)
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200765
766
767def _enter_task(loop, task):
768 current_task = _current_tasks.get(loop)
769 if current_task is not None:
770 raise RuntimeError(f"Cannot enter into task {task!r} while another "
771 f"task {current_task!r} is being executed.")
772 _current_tasks[loop] = task
773
774
775def _leave_task(loop, task):
776 current_task = _current_tasks.get(loop)
777 if current_task is not task:
778 raise RuntimeError(f"Leaving task {task!r} does not match "
779 f"the current task {current_task!r}.")
780 del _current_tasks[loop]
781
782
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500783def _unregister_task(task):
784 """Unregister a task."""
785 _all_tasks.discard(task)
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200786
787
788_py_register_task = _register_task
789_py_unregister_task = _unregister_task
790_py_enter_task = _enter_task
791_py_leave_task = _leave_task
792
793
794try:
795 from _asyncio import (_register_task, _unregister_task,
796 _enter_task, _leave_task,
797 _all_tasks, _current_tasks)
798except ImportError:
799 pass
800else:
801 _c_register_task = _register_task
802 _c_unregister_task = _unregister_task
803 _c_enter_task = _enter_task
804 _c_leave_task = _leave_task