blob: b11808853e24fe88a59b54a033c9c775c53e7f9b [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Yury Selivanov6370f342017-12-10 18:36:12 -05003__all__ = (
Andrew Svetlovf74ef452017-12-15 07:04:38 +02004 'Task', 'create_task',
Yury Selivanov6370f342017-12-10 18:36:12 -05005 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
Yury Selivanov9edad3c2017-12-11 10:03:48 -05006 'wait', 'wait_for', 'as_completed', 'sleep',
Yury Selivanov6370f342017-12-10 18:36:12 -05007 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
Andrew Svetlov44d1a592017-12-16 21:58:38 +02008 'current_task', 'all_tasks',
9 '_register_task', '_unregister_task', '_enter_task', '_leave_task',
Yury Selivanov6370f342017-12-10 18:36:12 -050010)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012import concurrent.futures
13import functools
14import inspect
Andrew Svetlov5f841b52017-12-09 00:23:48 +020015import types
Yury Selivanov59eb9a42015-05-11 14:48:38 -040016import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017import weakref
18
Yury Selivanova0c1ba62016-10-28 12:52:37 -040019from . import base_tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020020from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021from . import events
22from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020023from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070024
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025
Andrew Svetlov44d1a592017-12-16 21:58:38 +020026def current_task(loop=None):
27 """Return a currently executed task."""
28 if loop is None:
29 loop = events.get_running_loop()
30 return _current_tasks.get(loop)
31
32
33def all_tasks(loop=None):
34 """Return a set of all tasks for the loop."""
35 if loop is None:
36 loop = events.get_event_loop()
Yury Selivanovca9b36c2017-12-23 15:04:15 -050037 return {t for t in _all_tasks if futures._get_loop(t) is loop}
Andrew Svetlov44d1a592017-12-16 21:58:38 +020038
39
Yury Selivanov0cf16f92017-12-25 10:48:15 -050040class Task(futures._PyFuture): # Inherit Python Task implementation
41 # from a Python Future implementation.
42
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070043 """A coroutine wrapped in a Future."""
44
45 # An important invariant maintained while a Task not done:
46 #
47 # - Either _fut_waiter is None, and _step() is scheduled;
48 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
49 #
50 # The only transition from the latter to the former is through
51 # _wakeup(). When _fut_waiter is not None, one of its callbacks
52 # must be _wakeup().
53
Victor Stinnerfe22e092014-12-04 23:00:13 +010054 # If False, don't log a message if the task is destroyed whereas its
55 # status is still pending
56 _log_destroy_pending = True
57
Guido van Rossum1a605ed2013-12-06 12:57:40 -080058 @classmethod
59 def current_task(cls, loop=None):
60 """Return the currently running task in an event loop or None.
61
62 By default the current task for the current event loop is returned.
63
64 None is returned when called not in the context of a Task.
65 """
Andrew Svetlov44d1a592017-12-16 21:58:38 +020066 warnings.warn("Task.current_task() is deprecated, "
67 "use asyncio.current_task() instead",
68 PendingDeprecationWarning,
69 stacklevel=2)
Guido van Rossum1a605ed2013-12-06 12:57:40 -080070 if loop is None:
71 loop = events.get_event_loop()
Andrew Svetlov44d1a592017-12-16 21:58:38 +020072 return current_task(loop)
Guido van Rossum1a605ed2013-12-06 12:57:40 -080073
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074 @classmethod
75 def all_tasks(cls, loop=None):
76 """Return a set of all tasks for an event loop.
77
78 By default all tasks for the current event loop are returned.
79 """
Andrew Svetlov44d1a592017-12-16 21:58:38 +020080 warnings.warn("Task.all_tasks() is deprecated, "
81 "use asyncio.all_tasks() instead",
82 PendingDeprecationWarning,
83 stacklevel=2)
84 return all_tasks(loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070085
86 def __init__(self, coro, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070087 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +020088 if self._source_traceback:
89 del self._source_traceback[-1]
Andrew Svetlovf74ef452017-12-15 07:04:38 +020090 if not coroutines.iscoroutine(coro):
91 # raise after Future.__init__(), attrs are required for __del__
92 # prevent logging for pending task in __del__
93 self._log_destroy_pending = False
94 raise TypeError(f"a coroutine was expected, got {coro!r}")
95
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070096 self._must_cancel = False
Andrew Svetlovf74ef452017-12-15 07:04:38 +020097 self._fut_waiter = None
98 self._coro = coro
99
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700100 self._loop.call_soon(self._step)
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500101 _register_task(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700102
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900103 def __del__(self):
104 if self._state == futures._PENDING and self._log_destroy_pending:
105 context = {
106 'task': self,
107 'message': 'Task was destroyed but it is pending!',
108 }
109 if self._source_traceback:
110 context['source_traceback'] = self._source_traceback
111 self._loop.call_exception_handler(context)
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500112 super().__del__()
Victor Stinnera02f81f2014-06-24 22:37:53 +0200113
Victor Stinner313a9802014-07-29 12:58:23 +0200114 def _repr_info(self):
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400115 return base_tasks._task_repr_info(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700116
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500117 def set_result(self, result):
118 raise RuntimeError('Task does not support set_result operation')
119
120 def set_exception(self, exception):
121 raise RuntimeError('Task does not support set_exception operation')
122
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700123 def get_stack(self, *, limit=None):
124 """Return the list of stack frames for this task's coroutine.
125
Victor Stinnerd87de832014-12-02 17:57:04 +0100126 If the coroutine is not done, this returns the stack where it is
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700127 suspended. If the coroutine has completed successfully or was
128 cancelled, this returns an empty list. If the coroutine was
129 terminated by an exception, this returns the list of traceback
130 frames.
131
132 The frames are always ordered from oldest to newest.
133
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500134 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700135 return; by default all available frames are returned. Its
136 meaning differs depending on whether a stack or a traceback is
137 returned: the newest frames of a stack are returned, but the
138 oldest frames of a traceback are returned. (This matches the
139 behavior of the traceback module.)
140
141 For reasons beyond our control, only one stack frame is
142 returned for a suspended coroutine.
143 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400144 return base_tasks._task_get_stack(self, limit)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700145
146 def print_stack(self, *, limit=None, file=None):
147 """Print the stack or traceback for this task's coroutine.
148
149 This produces output similar to that of the traceback module,
150 for the frames retrieved by get_stack(). The limit argument
151 is passed to get_stack(). The file argument is an I/O stream
R David Murray8e069d52014-09-24 13:13:45 -0400152 to which the output is written; by default output is written
153 to sys.stderr.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700154 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400155 return base_tasks._task_print_stack(self, limit, file)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700156
157 def cancel(self):
R David Murray8e069d52014-09-24 13:13:45 -0400158 """Request that this task cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200159
Victor Stinner8d213572014-06-02 23:06:46 +0200160 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200161 wrapped coroutine on the next cycle through the event loop.
162 The coroutine then has a chance to clean up or even deny
163 the request using try/except/finally.
164
R David Murray8e069d52014-09-24 13:13:45 -0400165 Unlike Future.cancel, this does not guarantee that the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200166 task will be cancelled: the exception might be caught and
R David Murray8e069d52014-09-24 13:13:45 -0400167 acted upon, delaying cancellation of the task or preventing
168 cancellation completely. The task may also return a value or
169 raise a different exception.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200170
171 Immediately after this method is called, Task.cancelled() will
172 not return True (unless the task was already cancelled). A
173 task will be marked as cancelled when the wrapped coroutine
174 terminates with a CancelledError exception (even if cancel()
175 was not called).
176 """
Yury Selivanov7ce1c6f2017-06-11 13:49:18 +0000177 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700178 if self.done():
179 return False
180 if self._fut_waiter is not None:
181 if self._fut_waiter.cancel():
182 # Leave self._fut_waiter; it may be a Task that
183 # catches and ignores the cancellation so we may have
184 # to cancel it again later.
185 return True
186 # It must be the case that self._step is already scheduled.
187 self._must_cancel = True
188 return True
189
Yury Selivanovd59bba82015-11-20 12:41:03 -0500190 def _step(self, exc=None):
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500191 if self.done():
192 raise futures.InvalidStateError(
193 f'_step(): already done: {self!r}, {exc!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700194 if self._must_cancel:
195 if not isinstance(exc, futures.CancelledError):
196 exc = futures.CancelledError()
197 self._must_cancel = False
198 coro = self._coro
199 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800200
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200201 _enter_task(self._loop, self)
Yury Selivanovd59bba82015-11-20 12:41:03 -0500202 # Call either coro.throw(exc) or coro.send(None).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700203 try:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500204 if exc is None:
205 # We use the `send` method directly, because coroutines
206 # don't have `__iter__` and `__next__` methods.
207 result = coro.send(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700208 else:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500209 result = coro.throw(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700210 except StopIteration as exc:
INADA Naoki991adca2017-05-11 21:18:38 +0900211 if self._must_cancel:
212 # Task is cancelled right before coro stops.
213 self._must_cancel = False
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500214 super().set_exception(futures.CancelledError())
INADA Naoki991adca2017-05-11 21:18:38 +0900215 else:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500216 super().set_result(exc.value)
Yury Selivanov4145c832016-10-09 12:19:12 -0400217 except futures.CancelledError:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700218 super().cancel() # I.e., Future.cancel(self).
219 except Exception as exc:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500220 super().set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700221 except BaseException as exc:
Yury Selivanov0cf16f92017-12-25 10:48:15 -0500222 super().set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700223 raise
224 else:
Guido van Rossum1140a032016-09-09 12:54:54 -0700225 blocking = getattr(result, '_asyncio_future_blocking', None)
226 if blocking is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700227 # Yielded Future must come from Future.__iter__().
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500228 if futures._get_loop(result) is not self._loop:
Yury Selivanov6370f342017-12-10 18:36:12 -0500229 new_exc = RuntimeError(
230 f'Task {self!r} got Future '
231 f'{result!r} attached to a different loop')
232 self._loop.call_soon(self._step, new_exc)
Guido van Rossum1140a032016-09-09 12:54:54 -0700233 elif blocking:
Yury Selivanov4145c832016-10-09 12:19:12 -0400234 if result is self:
Yury Selivanov6370f342017-12-10 18:36:12 -0500235 new_exc = RuntimeError(
236 f'Task cannot await on itself: {self!r}')
237 self._loop.call_soon(self._step, new_exc)
Yury Selivanov4145c832016-10-09 12:19:12 -0400238 else:
239 result._asyncio_future_blocking = False
240 result.add_done_callback(self._wakeup)
241 self._fut_waiter = result
242 if self._must_cancel:
243 if self._fut_waiter.cancel():
244 self._must_cancel = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700245 else:
Yury Selivanov6370f342017-12-10 18:36:12 -0500246 new_exc = RuntimeError(
247 f'yield was used instead of yield from '
248 f'in task {self!r} with {result!r}')
249 self._loop.call_soon(self._step, new_exc)
250
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700251 elif result is None:
252 # Bare yield relinquishes control for one event loop iteration.
253 self._loop.call_soon(self._step)
254 elif inspect.isgenerator(result):
255 # Yielding a generator is just wrong.
Yury Selivanov6370f342017-12-10 18:36:12 -0500256 new_exc = RuntimeError(
257 f'yield was used instead of yield from for '
258 f'generator in task {self!r} with {result}')
259 self._loop.call_soon(self._step, new_exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700260 else:
261 # Yielding something else is an error.
Yury Selivanov6370f342017-12-10 18:36:12 -0500262 new_exc = RuntimeError(f'Task got bad yield: {result!r}')
263 self._loop.call_soon(self._step, new_exc)
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800264 finally:
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200265 _leave_task(self._loop, self)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100266 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700267
268 def _wakeup(self, future):
269 try:
Yury Selivanova4afc482015-11-16 15:12:10 -0500270 future.result()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700271 except Exception as exc:
272 # This may also be a cancellation.
Yury Selivanovd59bba82015-11-20 12:41:03 -0500273 self._step(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700274 else:
Yury Selivanova4afc482015-11-16 15:12:10 -0500275 # Don't pass the value of `future.result()` explicitly,
276 # as `Future.__iter__` and `Future.__await__` don't need it.
277 # If we call `_step(value, None)` instead of `_step()`,
278 # Python eval loop would use `.send(value)` method call,
279 # instead of `__next__()`, which is slower for futures
280 # that return non-generator iterators from their `__iter__`.
281 self._step()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700282 self = None # Needed to break cycles when an exception occurs.
283
284
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400285_PyTask = Task
286
287
288try:
289 import _asyncio
290except ImportError:
291 pass
292else:
293 # _CTask is needed for tests.
294 Task = _CTask = _asyncio.Task
295
296
Andrew Svetlovf74ef452017-12-15 07:04:38 +0200297def create_task(coro):
298 """Schedule the execution of a coroutine object in a spawn task.
299
300 Return a Task object.
301 """
302 loop = events.get_running_loop()
303 return loop.create_task(coro)
304
305
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700306# wait() and as_completed() similar to those in PEP 3148.
307
308FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
309FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
310ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
311
312
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200313async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700314 """Wait for the Futures and coroutines given by fs to complete.
315
Victor Stinnerdb74d982014-06-10 11:16:05 +0200316 The sequence futures must not be empty.
317
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700318 Coroutines will be wrapped in Tasks.
319
320 Returns two sets of Future: (done, pending).
321
322 Usage:
323
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200324 done, pending = await asyncio.wait(fs)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700325
326 Note: This does not raise TimeoutError! Futures that aren't done
327 when the timeout occurs are returned in the second set.
328 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700329 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500330 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700331 if not fs:
332 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200333 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
Yury Selivanov6370f342017-12-10 18:36:12 -0500334 raise ValueError(f'Invalid return_when value: {return_when}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700335
336 if loop is None:
337 loop = events.get_event_loop()
338
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400339 fs = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700340
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200341 return await _wait(fs, timeout, return_when, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700342
343
Victor Stinner59e08022014-08-28 11:19:25 +0200344def _release_waiter(waiter, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700345 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200346 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700347
348
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200349async def wait_for(fut, timeout, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700350 """Wait for the single Future or coroutine to complete, with timeout.
351
352 Coroutine will be wrapped in Task.
353
Victor Stinner421e49b2014-01-23 17:40:59 +0100354 Returns result of the Future or coroutine. When a timeout occurs,
355 it cancels the task and raises TimeoutError. To avoid the task
356 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700357
Victor Stinner922bc2c2015-01-15 16:29:10 +0100358 If the wait is cancelled, the task is also cancelled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700359
Victor Stinner922bc2c2015-01-15 16:29:10 +0100360 This function is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700361 """
362 if loop is None:
363 loop = events.get_event_loop()
364
Guido van Rossum48c66c32014-01-29 14:30:38 -0800365 if timeout is None:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200366 return await fut
Guido van Rossum48c66c32014-01-29 14:30:38 -0800367
Victor K4d071892017-10-05 19:04:39 +0300368 if timeout <= 0:
369 fut = ensure_future(fut, loop=loop)
370
371 if fut.done():
372 return fut.result()
373
374 fut.cancel()
375 raise futures.TimeoutError()
376
Yury Selivanov7661db62016-05-16 15:38:39 -0400377 waiter = loop.create_future()
Victor Stinner59e08022014-08-28 11:19:25 +0200378 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
379 cb = functools.partial(_release_waiter, waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700380
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400381 fut = ensure_future(fut, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700382 fut.add_done_callback(cb)
383
384 try:
Victor Stinner59e08022014-08-28 11:19:25 +0200385 # wait until the future completes or the timeout
Victor Stinner922bc2c2015-01-15 16:29:10 +0100386 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200387 await waiter
Victor Stinner922bc2c2015-01-15 16:29:10 +0100388 except futures.CancelledError:
389 fut.remove_done_callback(cb)
390 fut.cancel()
391 raise
Victor Stinner59e08022014-08-28 11:19:25 +0200392
393 if fut.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700394 return fut.result()
395 else:
396 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100397 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700398 raise futures.TimeoutError()
399 finally:
400 timeout_handle.cancel()
401
402
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200403async def _wait(fs, timeout, return_when, loop):
Victor Stinner2ba8ece2016-04-01 21:39:09 +0200404 """Internal helper for wait() and wait_for().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700405
406 The fs argument must be a collection of Futures.
407 """
408 assert fs, 'Set of Futures is empty.'
Yury Selivanov7661db62016-05-16 15:38:39 -0400409 waiter = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700410 timeout_handle = None
411 if timeout is not None:
412 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
413 counter = len(fs)
414
415 def _on_completion(f):
416 nonlocal counter
417 counter -= 1
418 if (counter <= 0 or
419 return_when == FIRST_COMPLETED or
420 return_when == FIRST_EXCEPTION and (not f.cancelled() and
421 f.exception() is not None)):
422 if timeout_handle is not None:
423 timeout_handle.cancel()
424 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200425 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700426
427 for f in fs:
428 f.add_done_callback(_on_completion)
429
430 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200431 await waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700432 finally:
433 if timeout_handle is not None:
434 timeout_handle.cancel()
435
436 done, pending = set(), set()
437 for f in fs:
438 f.remove_done_callback(_on_completion)
439 if f.done():
440 done.add(f)
441 else:
442 pending.add(f)
443 return done, pending
444
445
446# This is *not* a @coroutine! It is just an iterator (yielding Futures).
447def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800448 """Return an iterator whose values are coroutines.
449
450 When waiting for the yielded coroutines you'll get the results (or
451 exceptions!) of the original Futures (or coroutines), in the order
452 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700453
454 This differs from PEP 3148; the proper way to use this is:
455
456 for f in as_completed(fs):
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200457 result = await f # The 'await' may raise.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700458 # Use result.
459
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200460 If a timeout is specified, the 'await' will raise
Guido van Rossumb58f0532014-02-12 17:58:19 -0800461 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700462
463 Note: The futures 'f' are not necessarily members of fs.
464 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700465 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500466 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700467 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400468 todo = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800469 from .queues import Queue # Import here to avoid circular import problem.
470 done = Queue(loop=loop)
471 timeout_handle = None
472
473 def _on_timeout():
474 for f in todo:
475 f.remove_done_callback(_on_completion)
476 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
477 todo.clear() # Can't do todo.remove(f) in the loop.
478
479 def _on_completion(f):
480 if not todo:
481 return # _on_timeout() was here first.
482 todo.remove(f)
483 done.put_nowait(f)
484 if not todo and timeout_handle is not None:
485 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700486
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200487 async def _wait_for_one():
488 f = await done.get()
Guido van Rossumb58f0532014-02-12 17:58:19 -0800489 if f is None:
490 # Dummy value from _on_timeout().
491 raise futures.TimeoutError
492 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700493
Guido van Rossumb58f0532014-02-12 17:58:19 -0800494 for f in todo:
495 f.add_done_callback(_on_completion)
496 if todo and timeout is not None:
497 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700498 for _ in range(len(todo)):
499 yield _wait_for_one()
500
501
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200502@types.coroutine
503def __sleep0():
504 """Skip one event loop run cycle.
505
506 This is a private helper for 'asyncio.sleep()', used
507 when the 'delay' is set to 0. It uses a bare 'yield'
508 expression (which Task._step knows how to handle)
509 instead of creating a Future object.
510 """
511 yield
512
513
514async def sleep(delay, result=None, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700515 """Coroutine that completes after a given time (in seconds)."""
Andrew Svetlov5382c052017-12-17 16:41:30 +0200516 if delay <= 0:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200517 await __sleep0()
Yury Selivanovade04122015-11-05 14:29:04 -0500518 return result
519
Yury Selivanov7661db62016-05-16 15:38:39 -0400520 if loop is None:
521 loop = events.get_event_loop()
522 future = loop.create_future()
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500523 h = loop.call_later(delay,
524 futures._set_result_unless_cancelled,
525 future, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700526 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200527 return await future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700528 finally:
529 h.cancel()
530
531
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400532def ensure_future(coro_or_future, *, loop=None):
Yury Selivanov620279b2015-10-02 15:00:19 -0400533 """Wrap a coroutine or an awaitable in a future.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400534
535 If the argument is a Future, it is returned directly.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700536 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700537 if futures.isfuture(coro_or_future):
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500538 if loop is not None and loop is not futures._get_loop(coro_or_future):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700539 raise ValueError('loop argument must agree with Future')
540 return coro_or_future
Victor Stinnerf951d282014-06-29 00:46:45 +0200541 elif coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200542 if loop is None:
543 loop = events.get_event_loop()
544 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200545 if task._source_traceback:
546 del task._source_traceback[-1]
547 return task
Victor Stinner3f438a92017-11-28 14:43:52 +0100548 elif inspect.isawaitable(coro_or_future):
Yury Selivanov620279b2015-10-02 15:00:19 -0400549 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700550 else:
Charles Renwickae5b3262017-04-21 16:49:48 -0400551 raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
552 'required')
Yury Selivanov620279b2015-10-02 15:00:19 -0400553
554
555@coroutine
556def _wrap_awaitable(awaitable):
557 """Helper for asyncio.ensure_future().
558
559 Wraps awaitable (an object with __await__) into a coroutine
560 that will later be wrapped in a Task by ensure_future().
561 """
562 return (yield from awaitable.__await__())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700563
564
565class _GatheringFuture(futures.Future):
566 """Helper for gather().
567
568 This overrides cancel() to cancel all the children and act more
569 like Task.cancel(), which doesn't immediately mark itself as
570 cancelled.
571 """
572
573 def __init__(self, children, *, loop=None):
574 super().__init__(loop=loop)
575 self._children = children
576
577 def cancel(self):
578 if self.done():
579 return False
Yury Selivanov3d676152016-10-21 17:22:17 -0400580 ret = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700581 for child in self._children:
Yury Selivanov3d676152016-10-21 17:22:17 -0400582 if child.cancel():
583 ret = True
584 return ret
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700585
586
587def gather(*coros_or_futures, loop=None, return_exceptions=False):
Yury Selivanov36c2c042017-12-19 07:19:53 -0500588 """Return a future aggregating results from the given coroutines/futures.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700589
Guido van Rossume3c65a72016-09-30 08:17:15 -0700590 Coroutines will be wrapped in a future and scheduled in the event
591 loop. They will not necessarily be scheduled in the same order as
592 passed in.
593
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700594 All futures must share the same event loop. If all the tasks are
595 done successfully, the returned future's result is the list of
596 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500597 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700598 exceptions in the tasks are treated the same as successful
599 results, and gathered in the result list; otherwise, the first
600 raised exception will be immediately propagated to the returned
601 future.
602
603 Cancellation: if the outer Future is cancelled, all children (that
604 have not completed yet) are also cancelled. If any child is
605 cancelled, this is treated as if it raised CancelledError --
606 the outer Future is *not* cancelled in this case. (This is to
607 prevent the cancellation of one child to cause other children to
608 be cancelled.)
609 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200610 if not coros_or_futures:
Yury Selivanov7661db62016-05-16 15:38:39 -0400611 if loop is None:
612 loop = events.get_event_loop()
613 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700614 outer.set_result([])
615 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200616
Yury Selivanov36c2c042017-12-19 07:19:53 -0500617 def _done_callback(fut):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700618 nonlocal nfinished
Yury Selivanov36c2c042017-12-19 07:19:53 -0500619 nfinished += 1
620
Victor Stinner3531d902015-01-09 01:42:52 +0100621 if outer.done():
622 if not fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700623 # Mark exception retrieved.
624 fut.exception()
625 return
Victor Stinner3531d902015-01-09 01:42:52 +0100626
Yury Selivanov36c2c042017-12-19 07:19:53 -0500627 if not return_exceptions:
628 if fut.cancelled():
629 # Check if 'fut' is cancelled first, as
630 # 'fut.exception()' will *raise* a CancelledError
631 # instead of returning it.
632 exc = futures.CancelledError()
633 outer.set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700634 return
Yury Selivanov36c2c042017-12-19 07:19:53 -0500635 else:
636 exc = fut.exception()
637 if exc is not None:
638 outer.set_exception(exc)
639 return
640
641 if nfinished == nfuts:
642 # All futures are done; create a list of results
643 # and set it to the 'outer' future.
644 results = []
645
646 for fut in children:
647 if fut.cancelled():
648 # Check if 'fut' is cancelled first, as
649 # 'fut.exception()' will *raise* a CancelledError
650 # instead of returning it.
651 res = futures.CancelledError()
652 else:
653 res = fut.exception()
654 if res is None:
655 res = fut.result()
656 results.append(res)
657
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700658 outer.set_result(results)
659
Yury Selivanov36c2c042017-12-19 07:19:53 -0500660 arg_to_fut = {}
661 children = []
662 nfuts = 0
663 nfinished = 0
664 for arg in coros_or_futures:
665 if arg not in arg_to_fut:
666 fut = ensure_future(arg, loop=loop)
667 if loop is None:
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500668 loop = futures._get_loop(fut)
Yury Selivanov36c2c042017-12-19 07:19:53 -0500669 if fut is not arg:
670 # 'arg' was not a Future, therefore, 'fut' is a new
671 # Future created specifically for 'arg'. Since the caller
672 # can't control it, disable the "destroy pending task"
673 # warning.
674 fut._log_destroy_pending = False
675
676 nfuts += 1
677 arg_to_fut[arg] = fut
678 fut.add_done_callback(_done_callback)
679
680 else:
681 # There's a duplicate Future object in coros_or_futures.
682 fut = arg_to_fut[arg]
683
684 children.append(fut)
685
686 outer = _GatheringFuture(children, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700687 return outer
688
689
690def shield(arg, *, loop=None):
691 """Wait for a future, shielding it from cancellation.
692
693 The statement
694
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200695 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700696
697 is exactly equivalent to the statement
698
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200699 res = await something()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700700
701 *except* that if the coroutine containing it is cancelled, the
702 task running in something() is not cancelled. From the POV of
703 something(), the cancellation did not happen. But its caller is
704 still cancelled, so the yield-from expression still raises
705 CancelledError. Note: If something() is cancelled by other means
706 this will still cancel shield().
707
708 If you want to completely ignore cancellation (not recommended)
709 you can combine shield() with a try/except clause, as follows:
710
711 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200712 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700713 except CancelledError:
714 res = None
715 """
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400716 inner = ensure_future(arg, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700717 if inner.done():
718 # Shortcut.
719 return inner
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500720 loop = futures._get_loop(inner)
Yury Selivanov7661db62016-05-16 15:38:39 -0400721 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700722
723 def _done_callback(inner):
724 if outer.cancelled():
Victor Stinner3531d902015-01-09 01:42:52 +0100725 if not inner.cancelled():
726 # Mark inner's result as retrieved.
727 inner.exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700728 return
Victor Stinner3531d902015-01-09 01:42:52 +0100729
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700730 if inner.cancelled():
731 outer.cancel()
732 else:
733 exc = inner.exception()
734 if exc is not None:
735 outer.set_exception(exc)
736 else:
737 outer.set_result(inner.result())
738
739 inner.add_done_callback(_done_callback)
740 return outer
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700741
742
743def run_coroutine_threadsafe(coro, loop):
744 """Submit a coroutine object to a given event loop.
745
746 Return a concurrent.futures.Future to access the result.
747 """
748 if not coroutines.iscoroutine(coro):
749 raise TypeError('A coroutine object is required')
750 future = concurrent.futures.Future()
751
752 def callback():
Guido van Rossum601953b2015-10-05 16:20:00 -0700753 try:
754 futures._chain_future(ensure_future(coro, loop=loop), future)
755 except Exception as exc:
756 if future.set_running_or_notify_cancel():
757 future.set_exception(exc)
758 raise
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700759
760 loop.call_soon_threadsafe(callback)
761 return future
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200762
763
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500764# WeakSet containing all alive tasks.
765_all_tasks = weakref.WeakSet()
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200766
767# Dictionary containing tasks that are currently active in
768# all running event loops. {EventLoop: Task}
769_current_tasks = {}
770
771
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500772def _register_task(task):
773 """Register a new task in asyncio as executed by loop."""
774 _all_tasks.add(task)
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200775
776
777def _enter_task(loop, task):
778 current_task = _current_tasks.get(loop)
779 if current_task is not None:
780 raise RuntimeError(f"Cannot enter into task {task!r} while another "
781 f"task {current_task!r} is being executed.")
782 _current_tasks[loop] = task
783
784
785def _leave_task(loop, task):
786 current_task = _current_tasks.get(loop)
787 if current_task is not task:
788 raise RuntimeError(f"Leaving task {task!r} does not match "
789 f"the current task {current_task!r}.")
790 del _current_tasks[loop]
791
792
Yury Selivanovca9b36c2017-12-23 15:04:15 -0500793def _unregister_task(task):
794 """Unregister a task."""
795 _all_tasks.discard(task)
Andrew Svetlov44d1a592017-12-16 21:58:38 +0200796
797
798_py_register_task = _register_task
799_py_unregister_task = _unregister_task
800_py_enter_task = _enter_task
801_py_leave_task = _leave_task
802
803
804try:
805 from _asyncio import (_register_task, _unregister_task,
806 _enter_task, _leave_task,
807 _all_tasks, _current_tasks)
808except ImportError:
809 pass
810else:
811 _c_register_task = _register_task
812 _c_unregister_task = _unregister_task
813 _c_enter_task = _enter_task
814 _c_leave_task = _leave_task