blob: c5122f760715ac28b324697b03e8546b3d249f25 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Yury Selivanov6370f342017-12-10 18:36:12 -05003__all__ = (
4 'Task',
5 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
Yury Selivanov9edad3c2017-12-11 10:03:48 -05006 'wait', 'wait_for', 'as_completed', 'sleep',
Yury Selivanov6370f342017-12-10 18:36:12 -05007 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
8)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070010import concurrent.futures
11import functools
12import inspect
Andrew Svetlov5f841b52017-12-09 00:23:48 +020013import types
Yury Selivanov59eb9a42015-05-11 14:48:38 -040014import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015import weakref
16
Yury Selivanova0c1ba62016-10-28 12:52:37 -040017from . import base_tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020018from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019from . import events
20from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020021from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070024class Task(futures.Future):
25 """A coroutine wrapped in a Future."""
26
27 # An important invariant maintained while a Task not done:
28 #
29 # - Either _fut_waiter is None, and _step() is scheduled;
30 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
31 #
32 # The only transition from the latter to the former is through
33 # _wakeup(). When _fut_waiter is not None, one of its callbacks
34 # must be _wakeup().
35
36 # Weak set containing all tasks alive.
37 _all_tasks = weakref.WeakSet()
38
Guido van Rossum1a605ed2013-12-06 12:57:40 -080039 # Dictionary containing tasks that are currently active in
40 # all running event loops. {EventLoop: Task}
41 _current_tasks = {}
42
Victor Stinnerfe22e092014-12-04 23:00:13 +010043 # If False, don't log a message if the task is destroyed whereas its
44 # status is still pending
45 _log_destroy_pending = True
46
Guido van Rossum1a605ed2013-12-06 12:57:40 -080047 @classmethod
48 def current_task(cls, loop=None):
49 """Return the currently running task in an event loop or None.
50
51 By default the current task for the current event loop is returned.
52
53 None is returned when called not in the context of a Task.
54 """
55 if loop is None:
56 loop = events.get_event_loop()
57 return cls._current_tasks.get(loop)
58
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070059 @classmethod
60 def all_tasks(cls, loop=None):
61 """Return a set of all tasks for an event loop.
62
63 By default all tasks for the current event loop are returned.
64 """
65 if loop is None:
66 loop = events.get_event_loop()
67 return {t for t in cls._all_tasks if t._loop is loop}
68
69 def __init__(self, coro, *, loop=None):
Victor Stinner15cc6782015-01-09 00:09:10 +010070 assert coroutines.iscoroutine(coro), repr(coro)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070071 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +020072 if self._source_traceback:
73 del self._source_traceback[-1]
Yury Selivanov1ad08a52015-05-28 10:52:19 -040074 self._coro = coro
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070075 self._fut_waiter = None
76 self._must_cancel = False
77 self._loop.call_soon(self._step)
78 self.__class__._all_tasks.add(self)
79
INADA Naoki3e2ad8e2017-04-25 10:57:18 +090080 def __del__(self):
81 if self._state == futures._PENDING and self._log_destroy_pending:
82 context = {
83 'task': self,
84 'message': 'Task was destroyed but it is pending!',
85 }
86 if self._source_traceback:
87 context['source_traceback'] = self._source_traceback
88 self._loop.call_exception_handler(context)
89 futures.Future.__del__(self)
Victor Stinnera02f81f2014-06-24 22:37:53 +020090
Victor Stinner313a9802014-07-29 12:58:23 +020091 def _repr_info(self):
Yury Selivanova0c1ba62016-10-28 12:52:37 -040092 return base_tasks._task_repr_info(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070093
94 def get_stack(self, *, limit=None):
95 """Return the list of stack frames for this task's coroutine.
96
Victor Stinnerd87de832014-12-02 17:57:04 +010097 If the coroutine is not done, this returns the stack where it is
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070098 suspended. If the coroutine has completed successfully or was
99 cancelled, this returns an empty list. If the coroutine was
100 terminated by an exception, this returns the list of traceback
101 frames.
102
103 The frames are always ordered from oldest to newest.
104
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500105 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700106 return; by default all available frames are returned. Its
107 meaning differs depending on whether a stack or a traceback is
108 returned: the newest frames of a stack are returned, but the
109 oldest frames of a traceback are returned. (This matches the
110 behavior of the traceback module.)
111
112 For reasons beyond our control, only one stack frame is
113 returned for a suspended coroutine.
114 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400115 return base_tasks._task_get_stack(self, limit)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700116
117 def print_stack(self, *, limit=None, file=None):
118 """Print the stack or traceback for this task's coroutine.
119
120 This produces output similar to that of the traceback module,
121 for the frames retrieved by get_stack(). The limit argument
122 is passed to get_stack(). The file argument is an I/O stream
R David Murray8e069d52014-09-24 13:13:45 -0400123 to which the output is written; by default output is written
124 to sys.stderr.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700125 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400126 return base_tasks._task_print_stack(self, limit, file)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700127
128 def cancel(self):
R David Murray8e069d52014-09-24 13:13:45 -0400129 """Request that this task cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200130
Victor Stinner8d213572014-06-02 23:06:46 +0200131 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200132 wrapped coroutine on the next cycle through the event loop.
133 The coroutine then has a chance to clean up or even deny
134 the request using try/except/finally.
135
R David Murray8e069d52014-09-24 13:13:45 -0400136 Unlike Future.cancel, this does not guarantee that the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200137 task will be cancelled: the exception might be caught and
R David Murray8e069d52014-09-24 13:13:45 -0400138 acted upon, delaying cancellation of the task or preventing
139 cancellation completely. The task may also return a value or
140 raise a different exception.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200141
142 Immediately after this method is called, Task.cancelled() will
143 not return True (unless the task was already cancelled). A
144 task will be marked as cancelled when the wrapped coroutine
145 terminates with a CancelledError exception (even if cancel()
146 was not called).
147 """
Yury Selivanov7ce1c6f2017-06-11 13:49:18 +0000148 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700149 if self.done():
150 return False
151 if self._fut_waiter is not None:
152 if self._fut_waiter.cancel():
153 # Leave self._fut_waiter; it may be a Task that
154 # catches and ignores the cancellation so we may have
155 # to cancel it again later.
156 return True
157 # It must be the case that self._step is already scheduled.
158 self._must_cancel = True
159 return True
160
Yury Selivanovd59bba82015-11-20 12:41:03 -0500161 def _step(self, exc=None):
Yury Selivanov6370f342017-12-10 18:36:12 -0500162 assert not self.done(), f'_step(): already done: {self!r}, {exc!r}'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700163 if self._must_cancel:
164 if not isinstance(exc, futures.CancelledError):
165 exc = futures.CancelledError()
166 self._must_cancel = False
167 coro = self._coro
168 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800169
170 self.__class__._current_tasks[self._loop] = self
Yury Selivanovd59bba82015-11-20 12:41:03 -0500171 # Call either coro.throw(exc) or coro.send(None).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700172 try:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500173 if exc is None:
174 # We use the `send` method directly, because coroutines
175 # don't have `__iter__` and `__next__` methods.
176 result = coro.send(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700177 else:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500178 result = coro.throw(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700179 except StopIteration as exc:
INADA Naoki991adca2017-05-11 21:18:38 +0900180 if self._must_cancel:
181 # Task is cancelled right before coro stops.
182 self._must_cancel = False
183 self.set_exception(futures.CancelledError())
184 else:
185 self.set_result(exc.value)
Yury Selivanov4145c832016-10-09 12:19:12 -0400186 except futures.CancelledError:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700187 super().cancel() # I.e., Future.cancel(self).
188 except Exception as exc:
189 self.set_exception(exc)
190 except BaseException as exc:
191 self.set_exception(exc)
192 raise
193 else:
Guido van Rossum1140a032016-09-09 12:54:54 -0700194 blocking = getattr(result, '_asyncio_future_blocking', None)
195 if blocking is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700196 # Yielded Future must come from Future.__iter__().
Yury Selivanov0ac3a0c2015-12-11 11:33:59 -0500197 if result._loop is not self._loop:
Yury Selivanov6370f342017-12-10 18:36:12 -0500198 new_exc = RuntimeError(
199 f'Task {self!r} got Future '
200 f'{result!r} attached to a different loop')
201 self._loop.call_soon(self._step, new_exc)
Guido van Rossum1140a032016-09-09 12:54:54 -0700202 elif blocking:
Yury Selivanov4145c832016-10-09 12:19:12 -0400203 if result is self:
Yury Selivanov6370f342017-12-10 18:36:12 -0500204 new_exc = RuntimeError(
205 f'Task cannot await on itself: {self!r}')
206 self._loop.call_soon(self._step, new_exc)
Yury Selivanov4145c832016-10-09 12:19:12 -0400207 else:
208 result._asyncio_future_blocking = False
209 result.add_done_callback(self._wakeup)
210 self._fut_waiter = result
211 if self._must_cancel:
212 if self._fut_waiter.cancel():
213 self._must_cancel = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700214 else:
Yury Selivanov6370f342017-12-10 18:36:12 -0500215 new_exc = RuntimeError(
216 f'yield was used instead of yield from '
217 f'in task {self!r} with {result!r}')
218 self._loop.call_soon(self._step, new_exc)
219
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700220 elif result is None:
221 # Bare yield relinquishes control for one event loop iteration.
222 self._loop.call_soon(self._step)
223 elif inspect.isgenerator(result):
224 # Yielding a generator is just wrong.
Yury Selivanov6370f342017-12-10 18:36:12 -0500225 new_exc = RuntimeError(
226 f'yield was used instead of yield from for '
227 f'generator in task {self!r} with {result}')
228 self._loop.call_soon(self._step, new_exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700229 else:
230 # Yielding something else is an error.
Yury Selivanov6370f342017-12-10 18:36:12 -0500231 new_exc = RuntimeError(f'Task got bad yield: {result!r}')
232 self._loop.call_soon(self._step, new_exc)
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800233 finally:
234 self.__class__._current_tasks.pop(self._loop)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100235 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700236
237 def _wakeup(self, future):
238 try:
Yury Selivanova4afc482015-11-16 15:12:10 -0500239 future.result()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700240 except Exception as exc:
241 # This may also be a cancellation.
Yury Selivanovd59bba82015-11-20 12:41:03 -0500242 self._step(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700243 else:
Yury Selivanova4afc482015-11-16 15:12:10 -0500244 # Don't pass the value of `future.result()` explicitly,
245 # as `Future.__iter__` and `Future.__await__` don't need it.
246 # If we call `_step(value, None)` instead of `_step()`,
247 # Python eval loop would use `.send(value)` method call,
248 # instead of `__next__()`, which is slower for futures
249 # that return non-generator iterators from their `__iter__`.
250 self._step()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700251 self = None # Needed to break cycles when an exception occurs.
252
253
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400254_PyTask = Task
255
256
257try:
258 import _asyncio
259except ImportError:
260 pass
261else:
262 # _CTask is needed for tests.
263 Task = _CTask = _asyncio.Task
264
265
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700266# wait() and as_completed() similar to those in PEP 3148.
267
268FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
269FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
270ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
271
272
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200273async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700274 """Wait for the Futures and coroutines given by fs to complete.
275
Victor Stinnerdb74d982014-06-10 11:16:05 +0200276 The sequence futures must not be empty.
277
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700278 Coroutines will be wrapped in Tasks.
279
280 Returns two sets of Future: (done, pending).
281
282 Usage:
283
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200284 done, pending = await asyncio.wait(fs)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700285
286 Note: This does not raise TimeoutError! Futures that aren't done
287 when the timeout occurs are returned in the second set.
288 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700289 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500290 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700291 if not fs:
292 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200293 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
Yury Selivanov6370f342017-12-10 18:36:12 -0500294 raise ValueError(f'Invalid return_when value: {return_when}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700295
296 if loop is None:
297 loop = events.get_event_loop()
298
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400299 fs = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700300
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200301 return await _wait(fs, timeout, return_when, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700302
303
Victor Stinner59e08022014-08-28 11:19:25 +0200304def _release_waiter(waiter, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700305 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200306 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700307
308
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200309async def wait_for(fut, timeout, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700310 """Wait for the single Future or coroutine to complete, with timeout.
311
312 Coroutine will be wrapped in Task.
313
Victor Stinner421e49b2014-01-23 17:40:59 +0100314 Returns result of the Future or coroutine. When a timeout occurs,
315 it cancels the task and raises TimeoutError. To avoid the task
316 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700317
Victor Stinner922bc2c2015-01-15 16:29:10 +0100318 If the wait is cancelled, the task is also cancelled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700319
Victor Stinner922bc2c2015-01-15 16:29:10 +0100320 This function is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700321 """
322 if loop is None:
323 loop = events.get_event_loop()
324
Guido van Rossum48c66c32014-01-29 14:30:38 -0800325 if timeout is None:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200326 return await fut
Guido van Rossum48c66c32014-01-29 14:30:38 -0800327
Victor K4d071892017-10-05 19:04:39 +0300328 if timeout <= 0:
329 fut = ensure_future(fut, loop=loop)
330
331 if fut.done():
332 return fut.result()
333
334 fut.cancel()
335 raise futures.TimeoutError()
336
Yury Selivanov7661db62016-05-16 15:38:39 -0400337 waiter = loop.create_future()
Victor Stinner59e08022014-08-28 11:19:25 +0200338 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
339 cb = functools.partial(_release_waiter, waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700340
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400341 fut = ensure_future(fut, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700342 fut.add_done_callback(cb)
343
344 try:
Victor Stinner59e08022014-08-28 11:19:25 +0200345 # wait until the future completes or the timeout
Victor Stinner922bc2c2015-01-15 16:29:10 +0100346 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200347 await waiter
Victor Stinner922bc2c2015-01-15 16:29:10 +0100348 except futures.CancelledError:
349 fut.remove_done_callback(cb)
350 fut.cancel()
351 raise
Victor Stinner59e08022014-08-28 11:19:25 +0200352
353 if fut.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700354 return fut.result()
355 else:
356 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100357 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700358 raise futures.TimeoutError()
359 finally:
360 timeout_handle.cancel()
361
362
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200363async def _wait(fs, timeout, return_when, loop):
Victor Stinner2ba8ece2016-04-01 21:39:09 +0200364 """Internal helper for wait() and wait_for().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700365
366 The fs argument must be a collection of Futures.
367 """
368 assert fs, 'Set of Futures is empty.'
Yury Selivanov7661db62016-05-16 15:38:39 -0400369 waiter = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700370 timeout_handle = None
371 if timeout is not None:
372 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
373 counter = len(fs)
374
375 def _on_completion(f):
376 nonlocal counter
377 counter -= 1
378 if (counter <= 0 or
379 return_when == FIRST_COMPLETED or
380 return_when == FIRST_EXCEPTION and (not f.cancelled() and
381 f.exception() is not None)):
382 if timeout_handle is not None:
383 timeout_handle.cancel()
384 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200385 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700386
387 for f in fs:
388 f.add_done_callback(_on_completion)
389
390 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200391 await waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700392 finally:
393 if timeout_handle is not None:
394 timeout_handle.cancel()
395
396 done, pending = set(), set()
397 for f in fs:
398 f.remove_done_callback(_on_completion)
399 if f.done():
400 done.add(f)
401 else:
402 pending.add(f)
403 return done, pending
404
405
406# This is *not* a @coroutine! It is just an iterator (yielding Futures).
407def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800408 """Return an iterator whose values are coroutines.
409
410 When waiting for the yielded coroutines you'll get the results (or
411 exceptions!) of the original Futures (or coroutines), in the order
412 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700413
414 This differs from PEP 3148; the proper way to use this is:
415
416 for f in as_completed(fs):
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200417 result = await f # The 'await' may raise.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700418 # Use result.
419
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200420 If a timeout is specified, the 'await' will raise
Guido van Rossumb58f0532014-02-12 17:58:19 -0800421 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700422
423 Note: The futures 'f' are not necessarily members of fs.
424 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700425 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Yury Selivanov6370f342017-12-10 18:36:12 -0500426 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700427 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400428 todo = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800429 from .queues import Queue # Import here to avoid circular import problem.
430 done = Queue(loop=loop)
431 timeout_handle = None
432
433 def _on_timeout():
434 for f in todo:
435 f.remove_done_callback(_on_completion)
436 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
437 todo.clear() # Can't do todo.remove(f) in the loop.
438
439 def _on_completion(f):
440 if not todo:
441 return # _on_timeout() was here first.
442 todo.remove(f)
443 done.put_nowait(f)
444 if not todo and timeout_handle is not None:
445 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700446
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200447 async def _wait_for_one():
448 f = await done.get()
Guido van Rossumb58f0532014-02-12 17:58:19 -0800449 if f is None:
450 # Dummy value from _on_timeout().
451 raise futures.TimeoutError
452 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700453
Guido van Rossumb58f0532014-02-12 17:58:19 -0800454 for f in todo:
455 f.add_done_callback(_on_completion)
456 if todo and timeout is not None:
457 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700458 for _ in range(len(todo)):
459 yield _wait_for_one()
460
461
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200462@types.coroutine
463def __sleep0():
464 """Skip one event loop run cycle.
465
466 This is a private helper for 'asyncio.sleep()', used
467 when the 'delay' is set to 0. It uses a bare 'yield'
468 expression (which Task._step knows how to handle)
469 instead of creating a Future object.
470 """
471 yield
472
473
474async def sleep(delay, result=None, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700475 """Coroutine that completes after a given time (in seconds)."""
Yury Selivanovade04122015-11-05 14:29:04 -0500476 if delay == 0:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200477 await __sleep0()
Yury Selivanovade04122015-11-05 14:29:04 -0500478 return result
479
Yury Selivanov7661db62016-05-16 15:38:39 -0400480 if loop is None:
481 loop = events.get_event_loop()
482 future = loop.create_future()
Victor Stinnera9acbe82014-07-05 15:29:41 +0200483 h = future._loop.call_later(delay,
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500484 futures._set_result_unless_cancelled,
485 future, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700486 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200487 return await future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700488 finally:
489 h.cancel()
490
491
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400492def ensure_future(coro_or_future, *, loop=None):
Yury Selivanov620279b2015-10-02 15:00:19 -0400493 """Wrap a coroutine or an awaitable in a future.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400494
495 If the argument is a Future, it is returned directly.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700496 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700497 if futures.isfuture(coro_or_future):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700498 if loop is not None and loop is not coro_or_future._loop:
499 raise ValueError('loop argument must agree with Future')
500 return coro_or_future
Victor Stinnerf951d282014-06-29 00:46:45 +0200501 elif coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200502 if loop is None:
503 loop = events.get_event_loop()
504 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200505 if task._source_traceback:
506 del task._source_traceback[-1]
507 return task
Victor Stinner3f438a92017-11-28 14:43:52 +0100508 elif inspect.isawaitable(coro_or_future):
Yury Selivanov620279b2015-10-02 15:00:19 -0400509 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700510 else:
Charles Renwickae5b3262017-04-21 16:49:48 -0400511 raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
512 'required')
Yury Selivanov620279b2015-10-02 15:00:19 -0400513
514
515@coroutine
516def _wrap_awaitable(awaitable):
517 """Helper for asyncio.ensure_future().
518
519 Wraps awaitable (an object with __await__) into a coroutine
520 that will later be wrapped in a Task by ensure_future().
521 """
522 return (yield from awaitable.__await__())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700523
524
525class _GatheringFuture(futures.Future):
526 """Helper for gather().
527
528 This overrides cancel() to cancel all the children and act more
529 like Task.cancel(), which doesn't immediately mark itself as
530 cancelled.
531 """
532
533 def __init__(self, children, *, loop=None):
534 super().__init__(loop=loop)
535 self._children = children
536
537 def cancel(self):
538 if self.done():
539 return False
Yury Selivanov3d676152016-10-21 17:22:17 -0400540 ret = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700541 for child in self._children:
Yury Selivanov3d676152016-10-21 17:22:17 -0400542 if child.cancel():
543 ret = True
544 return ret
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700545
546
547def gather(*coros_or_futures, loop=None, return_exceptions=False):
548 """Return a future aggregating results from the given coroutines
549 or futures.
550
Guido van Rossume3c65a72016-09-30 08:17:15 -0700551 Coroutines will be wrapped in a future and scheduled in the event
552 loop. They will not necessarily be scheduled in the same order as
553 passed in.
554
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700555 All futures must share the same event loop. If all the tasks are
556 done successfully, the returned future's result is the list of
557 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500558 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700559 exceptions in the tasks are treated the same as successful
560 results, and gathered in the result list; otherwise, the first
561 raised exception will be immediately propagated to the returned
562 future.
563
564 Cancellation: if the outer Future is cancelled, all children (that
565 have not completed yet) are also cancelled. If any child is
566 cancelled, this is treated as if it raised CancelledError --
567 the outer Future is *not* cancelled in this case. (This is to
568 prevent the cancellation of one child to cause other children to
569 be cancelled.)
570 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200571 if not coros_or_futures:
Yury Selivanov7661db62016-05-16 15:38:39 -0400572 if loop is None:
573 loop = events.get_event_loop()
574 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700575 outer.set_result([])
576 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200577
578 arg_to_fut = {}
579 for arg in set(coros_or_futures):
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700580 if not futures.isfuture(arg):
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400581 fut = ensure_future(arg, loop=loop)
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200582 if loop is None:
583 loop = fut._loop
584 # The caller cannot control this future, the "destroy pending task"
585 # warning should not be emitted.
586 fut._log_destroy_pending = False
587 else:
588 fut = arg
589 if loop is None:
590 loop = fut._loop
591 elif fut._loop is not loop:
592 raise ValueError("futures are tied to different event loops")
593 arg_to_fut[arg] = fut
594
595 children = [arg_to_fut[arg] for arg in coros_or_futures]
596 nchildren = len(children)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700597 outer = _GatheringFuture(children, loop=loop)
598 nfinished = 0
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200599 results = [None] * nchildren
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700600
601 def _done_callback(i, fut):
602 nonlocal nfinished
Victor Stinner3531d902015-01-09 01:42:52 +0100603 if outer.done():
604 if not fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700605 # Mark exception retrieved.
606 fut.exception()
607 return
Victor Stinner3531d902015-01-09 01:42:52 +0100608
Victor Stinner29342622015-01-29 14:15:19 +0100609 if fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700610 res = futures.CancelledError()
611 if not return_exceptions:
612 outer.set_exception(res)
613 return
614 elif fut._exception is not None:
615 res = fut.exception() # Mark exception retrieved.
616 if not return_exceptions:
617 outer.set_exception(res)
618 return
619 else:
620 res = fut._result
621 results[i] = res
622 nfinished += 1
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200623 if nfinished == nchildren:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700624 outer.set_result(results)
625
626 for i, fut in enumerate(children):
627 fut.add_done_callback(functools.partial(_done_callback, i))
628 return outer
629
630
631def shield(arg, *, loop=None):
632 """Wait for a future, shielding it from cancellation.
633
634 The statement
635
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200636 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700637
638 is exactly equivalent to the statement
639
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200640 res = await something()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700641
642 *except* that if the coroutine containing it is cancelled, the
643 task running in something() is not cancelled. From the POV of
644 something(), the cancellation did not happen. But its caller is
645 still cancelled, so the yield-from expression still raises
646 CancelledError. Note: If something() is cancelled by other means
647 this will still cancel shield().
648
649 If you want to completely ignore cancellation (not recommended)
650 you can combine shield() with a try/except clause, as follows:
651
652 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200653 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700654 except CancelledError:
655 res = None
656 """
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400657 inner = ensure_future(arg, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700658 if inner.done():
659 # Shortcut.
660 return inner
661 loop = inner._loop
Yury Selivanov7661db62016-05-16 15:38:39 -0400662 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700663
664 def _done_callback(inner):
665 if outer.cancelled():
Victor Stinner3531d902015-01-09 01:42:52 +0100666 if not inner.cancelled():
667 # Mark inner's result as retrieved.
668 inner.exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700669 return
Victor Stinner3531d902015-01-09 01:42:52 +0100670
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700671 if inner.cancelled():
672 outer.cancel()
673 else:
674 exc = inner.exception()
675 if exc is not None:
676 outer.set_exception(exc)
677 else:
678 outer.set_result(inner.result())
679
680 inner.add_done_callback(_done_callback)
681 return outer
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700682
683
684def run_coroutine_threadsafe(coro, loop):
685 """Submit a coroutine object to a given event loop.
686
687 Return a concurrent.futures.Future to access the result.
688 """
689 if not coroutines.iscoroutine(coro):
690 raise TypeError('A coroutine object is required')
691 future = concurrent.futures.Future()
692
693 def callback():
Guido van Rossum601953b2015-10-05 16:20:00 -0700694 try:
695 futures._chain_future(ensure_future(coro, loop=loop), future)
696 except Exception as exc:
697 if future.set_running_or_notify_cancel():
698 future.set_exception(exc)
699 raise
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700700
701 loop.call_soon_threadsafe(callback)
702 return future