blob: d7867d128a8afe97fe7edcb63c5a8b609b04070b [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Victor Stinnerf951d282014-06-29 00:46:45 +02003__all__ = ['Task',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
5 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
Guido van Rossum841d9ee2015-10-03 08:31:42 -07006 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007 ]
8
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009import concurrent.futures
10import functools
11import inspect
Yury Selivanov59eb9a42015-05-11 14:48:38 -040012import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070013import weakref
14
Yury Selivanova0c1ba62016-10-28 12:52:37 -040015from . import base_tasks
Victor Stinner71080fc2015-07-25 02:23:21 +020016from . import compat
Victor Stinnerf951d282014-06-29 00:46:45 +020017from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import events
19from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020020from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023class Task(futures.Future):
24 """A coroutine wrapped in a Future."""
25
26 # An important invariant maintained while a Task not done:
27 #
28 # - Either _fut_waiter is None, and _step() is scheduled;
29 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
30 #
31 # The only transition from the latter to the former is through
32 # _wakeup(). When _fut_waiter is not None, one of its callbacks
33 # must be _wakeup().
34
35 # Weak set containing all tasks alive.
36 _all_tasks = weakref.WeakSet()
37
Guido van Rossum1a605ed2013-12-06 12:57:40 -080038 # Dictionary containing tasks that are currently active in
39 # all running event loops. {EventLoop: Task}
40 _current_tasks = {}
41
Victor Stinnerfe22e092014-12-04 23:00:13 +010042 # If False, don't log a message if the task is destroyed whereas its
43 # status is still pending
44 _log_destroy_pending = True
45
Guido van Rossum1a605ed2013-12-06 12:57:40 -080046 @classmethod
47 def current_task(cls, loop=None):
48 """Return the currently running task in an event loop or None.
49
50 By default the current task for the current event loop is returned.
51
52 None is returned when called not in the context of a Task.
53 """
54 if loop is None:
55 loop = events.get_event_loop()
56 return cls._current_tasks.get(loop)
57
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070058 @classmethod
59 def all_tasks(cls, loop=None):
60 """Return a set of all tasks for an event loop.
61
62 By default all tasks for the current event loop are returned.
63 """
64 if loop is None:
65 loop = events.get_event_loop()
66 return {t for t in cls._all_tasks if t._loop is loop}
67
68 def __init__(self, coro, *, loop=None):
Victor Stinner15cc6782015-01-09 00:09:10 +010069 assert coroutines.iscoroutine(coro), repr(coro)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070070 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +020071 if self._source_traceback:
72 del self._source_traceback[-1]
Yury Selivanov1ad08a52015-05-28 10:52:19 -040073 self._coro = coro
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074 self._fut_waiter = None
75 self._must_cancel = False
76 self._loop.call_soon(self._step)
77 self.__class__._all_tasks.add(self)
78
R David Murray8e069d52014-09-24 13:13:45 -040079 # On Python 3.3 or older, objects with a destructor that are part of a
80 # reference cycle are never destroyed. That's not the case any more on
81 # Python 3.4 thanks to the PEP 442.
Victor Stinner71080fc2015-07-25 02:23:21 +020082 if compat.PY34:
Victor Stinnera02f81f2014-06-24 22:37:53 +020083 def __del__(self):
Victor Stinner98b63912014-06-30 14:51:04 +020084 if self._state == futures._PENDING and self._log_destroy_pending:
Victor Stinner80f53aa2014-06-27 13:52:20 +020085 context = {
Victor Stinnera02f81f2014-06-24 22:37:53 +020086 'task': self,
87 'message': 'Task was destroyed but it is pending!',
Victor Stinner80f53aa2014-06-27 13:52:20 +020088 }
89 if self._source_traceback:
90 context['source_traceback'] = self._source_traceback
91 self._loop.call_exception_handler(context)
Victor Stinnera02f81f2014-06-24 22:37:53 +020092 futures.Future.__del__(self)
93
Victor Stinner313a9802014-07-29 12:58:23 +020094 def _repr_info(self):
Yury Selivanova0c1ba62016-10-28 12:52:37 -040095 return base_tasks._task_repr_info(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070096
97 def get_stack(self, *, limit=None):
98 """Return the list of stack frames for this task's coroutine.
99
Victor Stinnerd87de832014-12-02 17:57:04 +0100100 If the coroutine is not done, this returns the stack where it is
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700101 suspended. If the coroutine has completed successfully or was
102 cancelled, this returns an empty list. If the coroutine was
103 terminated by an exception, this returns the list of traceback
104 frames.
105
106 The frames are always ordered from oldest to newest.
107
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500108 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700109 return; by default all available frames are returned. Its
110 meaning differs depending on whether a stack or a traceback is
111 returned: the newest frames of a stack are returned, but the
112 oldest frames of a traceback are returned. (This matches the
113 behavior of the traceback module.)
114
115 For reasons beyond our control, only one stack frame is
116 returned for a suspended coroutine.
117 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400118 return base_tasks._task_get_stack(self, limit)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700119
120 def print_stack(self, *, limit=None, file=None):
121 """Print the stack or traceback for this task's coroutine.
122
123 This produces output similar to that of the traceback module,
124 for the frames retrieved by get_stack(). The limit argument
125 is passed to get_stack(). The file argument is an I/O stream
R David Murray8e069d52014-09-24 13:13:45 -0400126 to which the output is written; by default output is written
127 to sys.stderr.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700128 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400129 return base_tasks._task_print_stack(self, limit, file)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700130
131 def cancel(self):
R David Murray8e069d52014-09-24 13:13:45 -0400132 """Request that this task cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200133
Victor Stinner8d213572014-06-02 23:06:46 +0200134 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200135 wrapped coroutine on the next cycle through the event loop.
136 The coroutine then has a chance to clean up or even deny
137 the request using try/except/finally.
138
R David Murray8e069d52014-09-24 13:13:45 -0400139 Unlike Future.cancel, this does not guarantee that the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200140 task will be cancelled: the exception might be caught and
R David Murray8e069d52014-09-24 13:13:45 -0400141 acted upon, delaying cancellation of the task or preventing
142 cancellation completely. The task may also return a value or
143 raise a different exception.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200144
145 Immediately after this method is called, Task.cancelled() will
146 not return True (unless the task was already cancelled). A
147 task will be marked as cancelled when the wrapped coroutine
148 terminates with a CancelledError exception (even if cancel()
149 was not called).
150 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700151 if self.done():
152 return False
153 if self._fut_waiter is not None:
154 if self._fut_waiter.cancel():
155 # Leave self._fut_waiter; it may be a Task that
156 # catches and ignores the cancellation so we may have
157 # to cancel it again later.
158 return True
159 # It must be the case that self._step is already scheduled.
160 self._must_cancel = True
161 return True
162
Yury Selivanovd59bba82015-11-20 12:41:03 -0500163 def _step(self, exc=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700164 assert not self.done(), \
Yury Selivanovd59bba82015-11-20 12:41:03 -0500165 '_step(): already done: {!r}, {!r}'.format(self, exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700166 if self._must_cancel:
167 if not isinstance(exc, futures.CancelledError):
168 exc = futures.CancelledError()
169 self._must_cancel = False
170 coro = self._coro
171 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800172
173 self.__class__._current_tasks[self._loop] = self
Yury Selivanovd59bba82015-11-20 12:41:03 -0500174 # Call either coro.throw(exc) or coro.send(None).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700175 try:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500176 if exc is None:
177 # We use the `send` method directly, because coroutines
178 # don't have `__iter__` and `__next__` methods.
179 result = coro.send(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700180 else:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500181 result = coro.throw(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700182 except StopIteration as exc:
INADA Naoki3dc7c522017-05-11 21:56:42 +0900183 if self._must_cancel:
184 # Task is cancelled right before coro stops.
185 self._must_cancel = False
186 self.set_exception(futures.CancelledError())
187 else:
188 self.set_result(exc.value)
Yury Selivanov4145c832016-10-09 12:19:12 -0400189 except futures.CancelledError:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700190 super().cancel() # I.e., Future.cancel(self).
191 except Exception as exc:
192 self.set_exception(exc)
193 except BaseException as exc:
194 self.set_exception(exc)
195 raise
196 else:
Guido van Rossum1140a032016-09-09 12:54:54 -0700197 blocking = getattr(result, '_asyncio_future_blocking', None)
198 if blocking is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700199 # Yielded Future must come from Future.__iter__().
Yury Selivanov0ac3a0c2015-12-11 11:33:59 -0500200 if result._loop is not self._loop:
201 self._loop.call_soon(
202 self._step,
203 RuntimeError(
204 'Task {!r} got Future {!r} attached to a '
205 'different loop'.format(self, result)))
Guido van Rossum1140a032016-09-09 12:54:54 -0700206 elif blocking:
Yury Selivanov4145c832016-10-09 12:19:12 -0400207 if result is self:
208 self._loop.call_soon(
209 self._step,
210 RuntimeError(
211 'Task cannot await on itself: {!r}'.format(
212 self)))
213 else:
214 result._asyncio_future_blocking = False
215 result.add_done_callback(self._wakeup)
216 self._fut_waiter = result
217 if self._must_cancel:
218 if self._fut_waiter.cancel():
219 self._must_cancel = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700220 else:
221 self._loop.call_soon(
Yury Selivanovd59bba82015-11-20 12:41:03 -0500222 self._step,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700223 RuntimeError(
224 'yield was used instead of yield from '
225 'in task {!r} with {!r}'.format(self, result)))
226 elif result is None:
227 # Bare yield relinquishes control for one event loop iteration.
228 self._loop.call_soon(self._step)
229 elif inspect.isgenerator(result):
230 # Yielding a generator is just wrong.
231 self._loop.call_soon(
Yury Selivanovd59bba82015-11-20 12:41:03 -0500232 self._step,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700233 RuntimeError(
234 'yield was used instead of yield from for '
235 'generator in task {!r} with {}'.format(
236 self, result)))
237 else:
238 # Yielding something else is an error.
239 self._loop.call_soon(
Yury Selivanovd59bba82015-11-20 12:41:03 -0500240 self._step,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700241 RuntimeError(
242 'Task got bad yield: {!r}'.format(result)))
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800243 finally:
244 self.__class__._current_tasks.pop(self._loop)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100245 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700246
247 def _wakeup(self, future):
248 try:
Yury Selivanova4afc482015-11-16 15:12:10 -0500249 future.result()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700250 except Exception as exc:
251 # This may also be a cancellation.
Yury Selivanovd59bba82015-11-20 12:41:03 -0500252 self._step(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700253 else:
Yury Selivanova4afc482015-11-16 15:12:10 -0500254 # Don't pass the value of `future.result()` explicitly,
255 # as `Future.__iter__` and `Future.__await__` don't need it.
256 # If we call `_step(value, None)` instead of `_step()`,
257 # Python eval loop would use `.send(value)` method call,
258 # instead of `__next__()`, which is slower for futures
259 # that return non-generator iterators from their `__iter__`.
260 self._step()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700261 self = None # Needed to break cycles when an exception occurs.
262
263
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400264_PyTask = Task
265
266
267try:
268 import _asyncio
269except ImportError:
270 pass
271else:
272 # _CTask is needed for tests.
273 Task = _CTask = _asyncio.Task
274
275
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700276# wait() and as_completed() similar to those in PEP 3148.
277
278FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
279FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
280ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
281
282
283@coroutine
284def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
285 """Wait for the Futures and coroutines given by fs to complete.
286
Victor Stinnerdb74d982014-06-10 11:16:05 +0200287 The sequence futures must not be empty.
288
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700289 Coroutines will be wrapped in Tasks.
290
291 Returns two sets of Future: (done, pending).
292
293 Usage:
294
295 done, pending = yield from asyncio.wait(fs)
296
297 Note: This does not raise TimeoutError! Futures that aren't done
298 when the timeout occurs are returned in the second set.
299 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700300 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100301 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700302 if not fs:
303 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200304 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
305 raise ValueError('Invalid return_when value: {}'.format(return_when))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700306
307 if loop is None:
308 loop = events.get_event_loop()
309
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400310 fs = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700311
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700312 return (yield from _wait(fs, timeout, return_when, loop))
313
314
Victor Stinner59e08022014-08-28 11:19:25 +0200315def _release_waiter(waiter, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700316 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200317 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700318
319
320@coroutine
321def wait_for(fut, timeout, *, loop=None):
322 """Wait for the single Future or coroutine to complete, with timeout.
323
324 Coroutine will be wrapped in Task.
325
Victor Stinner421e49b2014-01-23 17:40:59 +0100326 Returns result of the Future or coroutine. When a timeout occurs,
327 it cancels the task and raises TimeoutError. To avoid the task
328 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700329
Victor Stinner922bc2c2015-01-15 16:29:10 +0100330 If the wait is cancelled, the task is also cancelled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700331
Victor Stinner922bc2c2015-01-15 16:29:10 +0100332 This function is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700333 """
334 if loop is None:
335 loop = events.get_event_loop()
336
Guido van Rossum48c66c32014-01-29 14:30:38 -0800337 if timeout is None:
338 return (yield from fut)
339
Yury Selivanov7661db62016-05-16 15:38:39 -0400340 waiter = loop.create_future()
Victor Stinner59e08022014-08-28 11:19:25 +0200341 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
342 cb = functools.partial(_release_waiter, waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700343
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400344 fut = ensure_future(fut, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700345 fut.add_done_callback(cb)
346
347 try:
Victor Stinner59e08022014-08-28 11:19:25 +0200348 # wait until the future completes or the timeout
Victor Stinner922bc2c2015-01-15 16:29:10 +0100349 try:
350 yield from waiter
351 except futures.CancelledError:
352 fut.remove_done_callback(cb)
353 fut.cancel()
354 raise
Victor Stinner59e08022014-08-28 11:19:25 +0200355
356 if fut.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700357 return fut.result()
358 else:
359 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100360 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700361 raise futures.TimeoutError()
362 finally:
363 timeout_handle.cancel()
364
365
366@coroutine
367def _wait(fs, timeout, return_when, loop):
Victor Stinner2ba8ece2016-04-01 21:39:09 +0200368 """Internal helper for wait() and wait_for().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700369
370 The fs argument must be a collection of Futures.
371 """
372 assert fs, 'Set of Futures is empty.'
Yury Selivanov7661db62016-05-16 15:38:39 -0400373 waiter = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700374 timeout_handle = None
375 if timeout is not None:
376 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
377 counter = len(fs)
378
379 def _on_completion(f):
380 nonlocal counter
381 counter -= 1
382 if (counter <= 0 or
383 return_when == FIRST_COMPLETED or
384 return_when == FIRST_EXCEPTION and (not f.cancelled() and
385 f.exception() is not None)):
386 if timeout_handle is not None:
387 timeout_handle.cancel()
388 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200389 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700390
391 for f in fs:
392 f.add_done_callback(_on_completion)
393
394 try:
395 yield from waiter
396 finally:
397 if timeout_handle is not None:
398 timeout_handle.cancel()
399
400 done, pending = set(), set()
401 for f in fs:
402 f.remove_done_callback(_on_completion)
403 if f.done():
404 done.add(f)
405 else:
406 pending.add(f)
407 return done, pending
408
409
410# This is *not* a @coroutine! It is just an iterator (yielding Futures).
411def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800412 """Return an iterator whose values are coroutines.
413
414 When waiting for the yielded coroutines you'll get the results (or
415 exceptions!) of the original Futures (or coroutines), in the order
416 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700417
418 This differs from PEP 3148; the proper way to use this is:
419
420 for f in as_completed(fs):
421 result = yield from f # The 'yield from' may raise.
422 # Use result.
423
Guido van Rossumb58f0532014-02-12 17:58:19 -0800424 If a timeout is specified, the 'yield from' will raise
425 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700426
427 Note: The futures 'f' are not necessarily members of fs.
428 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700429 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100430 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700431 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400432 todo = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800433 from .queues import Queue # Import here to avoid circular import problem.
434 done = Queue(loop=loop)
435 timeout_handle = None
436
437 def _on_timeout():
438 for f in todo:
439 f.remove_done_callback(_on_completion)
440 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
441 todo.clear() # Can't do todo.remove(f) in the loop.
442
443 def _on_completion(f):
444 if not todo:
445 return # _on_timeout() was here first.
446 todo.remove(f)
447 done.put_nowait(f)
448 if not todo and timeout_handle is not None:
449 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700450
451 @coroutine
452 def _wait_for_one():
Guido van Rossumb58f0532014-02-12 17:58:19 -0800453 f = yield from done.get()
454 if f is None:
455 # Dummy value from _on_timeout().
456 raise futures.TimeoutError
457 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700458
Guido van Rossumb58f0532014-02-12 17:58:19 -0800459 for f in todo:
460 f.add_done_callback(_on_completion)
461 if todo and timeout is not None:
462 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700463 for _ in range(len(todo)):
464 yield _wait_for_one()
465
466
467@coroutine
468def sleep(delay, result=None, *, loop=None):
469 """Coroutine that completes after a given time (in seconds)."""
Yury Selivanovade04122015-11-05 14:29:04 -0500470 if delay == 0:
471 yield
472 return result
473
Yury Selivanov7661db62016-05-16 15:38:39 -0400474 if loop is None:
475 loop = events.get_event_loop()
476 future = loop.create_future()
Victor Stinnera9acbe82014-07-05 15:29:41 +0200477 h = future._loop.call_later(delay,
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500478 futures._set_result_unless_cancelled,
479 future, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700480 try:
481 return (yield from future)
482 finally:
483 h.cancel()
484
485
Yury Selivanov4357cf62016-09-15 13:49:08 -0400486def async_(coro_or_future, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700487 """Wrap a coroutine in a future.
488
489 If the argument is a Future, it is returned directly.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400490
491 This function is deprecated in 3.5. Use asyncio.ensure_future() instead.
492 """
493
494 warnings.warn("asyncio.async() function is deprecated, use ensure_future()",
Mariatta Wijaya4e7ff8b2017-02-06 22:03:00 -0800495 DeprecationWarning,
496 stacklevel=2)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400497
498 return ensure_future(coro_or_future, loop=loop)
499
Yury Selivanov4357cf62016-09-15 13:49:08 -0400500# Silence DeprecationWarning:
501globals()['async'] = async_
502async_.__name__ = 'async'
503del async_
504
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400505
506def ensure_future(coro_or_future, *, loop=None):
Yury Selivanov620279b2015-10-02 15:00:19 -0400507 """Wrap a coroutine or an awaitable in a future.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400508
509 If the argument is a Future, it is returned directly.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700510 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700511 if futures.isfuture(coro_or_future):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700512 if loop is not None and loop is not coro_or_future._loop:
513 raise ValueError('loop argument must agree with Future')
514 return coro_or_future
Victor Stinnerf951d282014-06-29 00:46:45 +0200515 elif coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200516 if loop is None:
517 loop = events.get_event_loop()
518 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200519 if task._source_traceback:
520 del task._source_traceback[-1]
521 return task
Yury Selivanov620279b2015-10-02 15:00:19 -0400522 elif compat.PY35 and inspect.isawaitable(coro_or_future):
523 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700524 else:
Mariattaa3d8dda2017-04-21 19:58:28 -0700525 raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
526 'required')
Yury Selivanov620279b2015-10-02 15:00:19 -0400527
528
529@coroutine
530def _wrap_awaitable(awaitable):
531 """Helper for asyncio.ensure_future().
532
533 Wraps awaitable (an object with __await__) into a coroutine
534 that will later be wrapped in a Task by ensure_future().
535 """
536 return (yield from awaitable.__await__())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700537
538
539class _GatheringFuture(futures.Future):
540 """Helper for gather().
541
542 This overrides cancel() to cancel all the children and act more
543 like Task.cancel(), which doesn't immediately mark itself as
544 cancelled.
545 """
546
547 def __init__(self, children, *, loop=None):
548 super().__init__(loop=loop)
549 self._children = children
550
551 def cancel(self):
552 if self.done():
553 return False
Yury Selivanov3d676152016-10-21 17:22:17 -0400554 ret = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700555 for child in self._children:
Yury Selivanov3d676152016-10-21 17:22:17 -0400556 if child.cancel():
557 ret = True
558 return ret
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700559
560
561def gather(*coros_or_futures, loop=None, return_exceptions=False):
562 """Return a future aggregating results from the given coroutines
563 or futures.
564
Guido van Rossume3c65a72016-09-30 08:17:15 -0700565 Coroutines will be wrapped in a future and scheduled in the event
566 loop. They will not necessarily be scheduled in the same order as
567 passed in.
568
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700569 All futures must share the same event loop. If all the tasks are
570 done successfully, the returned future's result is the list of
571 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500572 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700573 exceptions in the tasks are treated the same as successful
574 results, and gathered in the result list; otherwise, the first
575 raised exception will be immediately propagated to the returned
576 future.
577
578 Cancellation: if the outer Future is cancelled, all children (that
579 have not completed yet) are also cancelled. If any child is
580 cancelled, this is treated as if it raised CancelledError --
581 the outer Future is *not* cancelled in this case. (This is to
582 prevent the cancellation of one child to cause other children to
583 be cancelled.)
584 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200585 if not coros_or_futures:
Yury Selivanov7661db62016-05-16 15:38:39 -0400586 if loop is None:
587 loop = events.get_event_loop()
588 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700589 outer.set_result([])
590 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200591
592 arg_to_fut = {}
593 for arg in set(coros_or_futures):
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700594 if not futures.isfuture(arg):
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400595 fut = ensure_future(arg, loop=loop)
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200596 if loop is None:
597 loop = fut._loop
598 # The caller cannot control this future, the "destroy pending task"
599 # warning should not be emitted.
600 fut._log_destroy_pending = False
601 else:
602 fut = arg
603 if loop is None:
604 loop = fut._loop
605 elif fut._loop is not loop:
606 raise ValueError("futures are tied to different event loops")
607 arg_to_fut[arg] = fut
608
609 children = [arg_to_fut[arg] for arg in coros_or_futures]
610 nchildren = len(children)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700611 outer = _GatheringFuture(children, loop=loop)
612 nfinished = 0
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200613 results = [None] * nchildren
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700614
615 def _done_callback(i, fut):
616 nonlocal nfinished
Victor Stinner3531d902015-01-09 01:42:52 +0100617 if outer.done():
618 if not fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700619 # Mark exception retrieved.
620 fut.exception()
621 return
Victor Stinner3531d902015-01-09 01:42:52 +0100622
Victor Stinner29342622015-01-29 14:15:19 +0100623 if fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700624 res = futures.CancelledError()
625 if not return_exceptions:
626 outer.set_exception(res)
627 return
628 elif fut._exception is not None:
629 res = fut.exception() # Mark exception retrieved.
630 if not return_exceptions:
631 outer.set_exception(res)
632 return
633 else:
634 res = fut._result
635 results[i] = res
636 nfinished += 1
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200637 if nfinished == nchildren:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700638 outer.set_result(results)
639
640 for i, fut in enumerate(children):
641 fut.add_done_callback(functools.partial(_done_callback, i))
642 return outer
643
644
645def shield(arg, *, loop=None):
646 """Wait for a future, shielding it from cancellation.
647
648 The statement
649
650 res = yield from shield(something())
651
652 is exactly equivalent to the statement
653
654 res = yield from something()
655
656 *except* that if the coroutine containing it is cancelled, the
657 task running in something() is not cancelled. From the POV of
658 something(), the cancellation did not happen. But its caller is
659 still cancelled, so the yield-from expression still raises
660 CancelledError. Note: If something() is cancelled by other means
661 this will still cancel shield().
662
663 If you want to completely ignore cancellation (not recommended)
664 you can combine shield() with a try/except clause, as follows:
665
666 try:
667 res = yield from shield(something())
668 except CancelledError:
669 res = None
670 """
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400671 inner = ensure_future(arg, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700672 if inner.done():
673 # Shortcut.
674 return inner
675 loop = inner._loop
Yury Selivanov7661db62016-05-16 15:38:39 -0400676 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700677
678 def _done_callback(inner):
679 if outer.cancelled():
Victor Stinner3531d902015-01-09 01:42:52 +0100680 if not inner.cancelled():
681 # Mark inner's result as retrieved.
682 inner.exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700683 return
Victor Stinner3531d902015-01-09 01:42:52 +0100684
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700685 if inner.cancelled():
686 outer.cancel()
687 else:
688 exc = inner.exception()
689 if exc is not None:
690 outer.set_exception(exc)
691 else:
692 outer.set_result(inner.result())
693
694 inner.add_done_callback(_done_callback)
695 return outer
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700696
697
698def run_coroutine_threadsafe(coro, loop):
699 """Submit a coroutine object to a given event loop.
700
701 Return a concurrent.futures.Future to access the result.
702 """
703 if not coroutines.iscoroutine(coro):
704 raise TypeError('A coroutine object is required')
705 future = concurrent.futures.Future()
706
707 def callback():
Guido van Rossum601953b2015-10-05 16:20:00 -0700708 try:
709 futures._chain_future(ensure_future(coro, loop=loop), future)
710 except Exception as exc:
711 if future.set_running_or_notify_cancel():
712 future.set_exception(exc)
713 raise
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700714
715 loop.call_soon_threadsafe(callback)
716 return future