blob: 9fe2a2fabf076a351d41044569e8057e5b4c32b3 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Victor Stinnerf951d282014-06-29 00:46:45 +02003__all__ = ['Task',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
5 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
Guido van Rossum841d9ee2015-10-03 08:31:42 -07006 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007 ]
8
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009import concurrent.futures
10import functools
11import inspect
Yury Selivanov59eb9a42015-05-11 14:48:38 -040012import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070013import weakref
14
Yury Selivanova0c1ba62016-10-28 12:52:37 -040015from . import base_tasks
Victor Stinner71080fc2015-07-25 02:23:21 +020016from . import compat
Victor Stinnerf951d282014-06-29 00:46:45 +020017from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import events
19from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020020from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023class Task(futures.Future):
24 """A coroutine wrapped in a Future."""
25
26 # An important invariant maintained while a Task not done:
27 #
28 # - Either _fut_waiter is None, and _step() is scheduled;
29 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
30 #
31 # The only transition from the latter to the former is through
32 # _wakeup(). When _fut_waiter is not None, one of its callbacks
33 # must be _wakeup().
34
35 # Weak set containing all tasks alive.
36 _all_tasks = weakref.WeakSet()
37
Guido van Rossum1a605ed2013-12-06 12:57:40 -080038 # Dictionary containing tasks that are currently active in
39 # all running event loops. {EventLoop: Task}
40 _current_tasks = {}
41
Victor Stinnerfe22e092014-12-04 23:00:13 +010042 # If False, don't log a message if the task is destroyed whereas its
43 # status is still pending
44 _log_destroy_pending = True
45
Guido van Rossum1a605ed2013-12-06 12:57:40 -080046 @classmethod
47 def current_task(cls, loop=None):
48 """Return the currently running task in an event loop or None.
49
50 By default the current task for the current event loop is returned.
51
52 None is returned when called not in the context of a Task.
53 """
54 if loop is None:
55 loop = events.get_event_loop()
56 return cls._current_tasks.get(loop)
57
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070058 @classmethod
59 def all_tasks(cls, loop=None):
60 """Return a set of all tasks for an event loop.
61
62 By default all tasks for the current event loop are returned.
63 """
64 if loop is None:
65 loop = events.get_event_loop()
66 return {t for t in cls._all_tasks if t._loop is loop}
67
68 def __init__(self, coro, *, loop=None):
Victor Stinner15cc6782015-01-09 00:09:10 +010069 assert coroutines.iscoroutine(coro), repr(coro)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070070 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +020071 if self._source_traceback:
72 del self._source_traceback[-1]
Yury Selivanov1ad08a52015-05-28 10:52:19 -040073 self._coro = coro
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074 self._fut_waiter = None
75 self._must_cancel = False
76 self._loop.call_soon(self._step)
77 self.__class__._all_tasks.add(self)
78
R David Murray8e069d52014-09-24 13:13:45 -040079 # On Python 3.3 or older, objects with a destructor that are part of a
80 # reference cycle are never destroyed. That's not the case any more on
81 # Python 3.4 thanks to the PEP 442.
Victor Stinner71080fc2015-07-25 02:23:21 +020082 if compat.PY34:
Victor Stinnera02f81f2014-06-24 22:37:53 +020083 def __del__(self):
Victor Stinner98b63912014-06-30 14:51:04 +020084 if self._state == futures._PENDING and self._log_destroy_pending:
Victor Stinner80f53aa2014-06-27 13:52:20 +020085 context = {
Victor Stinnera02f81f2014-06-24 22:37:53 +020086 'task': self,
87 'message': 'Task was destroyed but it is pending!',
Victor Stinner80f53aa2014-06-27 13:52:20 +020088 }
89 if self._source_traceback:
90 context['source_traceback'] = self._source_traceback
91 self._loop.call_exception_handler(context)
Victor Stinnera02f81f2014-06-24 22:37:53 +020092 futures.Future.__del__(self)
93
Victor Stinner313a9802014-07-29 12:58:23 +020094 def _repr_info(self):
Yury Selivanova0c1ba62016-10-28 12:52:37 -040095 return base_tasks._task_repr_info(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070096
97 def get_stack(self, *, limit=None):
98 """Return the list of stack frames for this task's coroutine.
99
Victor Stinnerd87de832014-12-02 17:57:04 +0100100 If the coroutine is not done, this returns the stack where it is
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700101 suspended. If the coroutine has completed successfully or was
102 cancelled, this returns an empty list. If the coroutine was
103 terminated by an exception, this returns the list of traceback
104 frames.
105
106 The frames are always ordered from oldest to newest.
107
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500108 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700109 return; by default all available frames are returned. Its
110 meaning differs depending on whether a stack or a traceback is
111 returned: the newest frames of a stack are returned, but the
112 oldest frames of a traceback are returned. (This matches the
113 behavior of the traceback module.)
114
115 For reasons beyond our control, only one stack frame is
116 returned for a suspended coroutine.
117 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400118 return base_tasks._task_get_stack(self, limit)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700119
120 def print_stack(self, *, limit=None, file=None):
121 """Print the stack or traceback for this task's coroutine.
122
123 This produces output similar to that of the traceback module,
124 for the frames retrieved by get_stack(). The limit argument
125 is passed to get_stack(). The file argument is an I/O stream
R David Murray8e069d52014-09-24 13:13:45 -0400126 to which the output is written; by default output is written
127 to sys.stderr.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700128 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400129 return base_tasks._task_print_stack(self, limit, file)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700130
131 def cancel(self):
R David Murray8e069d52014-09-24 13:13:45 -0400132 """Request that this task cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200133
Victor Stinner8d213572014-06-02 23:06:46 +0200134 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200135 wrapped coroutine on the next cycle through the event loop.
136 The coroutine then has a chance to clean up or even deny
137 the request using try/except/finally.
138
R David Murray8e069d52014-09-24 13:13:45 -0400139 Unlike Future.cancel, this does not guarantee that the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200140 task will be cancelled: the exception might be caught and
R David Murray8e069d52014-09-24 13:13:45 -0400141 acted upon, delaying cancellation of the task or preventing
142 cancellation completely. The task may also return a value or
143 raise a different exception.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200144
145 Immediately after this method is called, Task.cancelled() will
146 not return True (unless the task was already cancelled). A
147 task will be marked as cancelled when the wrapped coroutine
148 terminates with a CancelledError exception (even if cancel()
149 was not called).
150 """
Yury Selivanov176f2eb2017-06-11 14:00:14 +0000151 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700152 if self.done():
153 return False
154 if self._fut_waiter is not None:
155 if self._fut_waiter.cancel():
156 # Leave self._fut_waiter; it may be a Task that
157 # catches and ignores the cancellation so we may have
158 # to cancel it again later.
159 return True
160 # It must be the case that self._step is already scheduled.
161 self._must_cancel = True
162 return True
163
Yury Selivanovd59bba82015-11-20 12:41:03 -0500164 def _step(self, exc=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700165 assert not self.done(), \
Yury Selivanovd59bba82015-11-20 12:41:03 -0500166 '_step(): already done: {!r}, {!r}'.format(self, exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700167 if self._must_cancel:
168 if not isinstance(exc, futures.CancelledError):
169 exc = futures.CancelledError()
170 self._must_cancel = False
171 coro = self._coro
172 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800173
174 self.__class__._current_tasks[self._loop] = self
Yury Selivanovd59bba82015-11-20 12:41:03 -0500175 # Call either coro.throw(exc) or coro.send(None).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700176 try:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500177 if exc is None:
178 # We use the `send` method directly, because coroutines
179 # don't have `__iter__` and `__next__` methods.
180 result = coro.send(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700181 else:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500182 result = coro.throw(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700183 except StopIteration as exc:
INADA Naoki3dc7c522017-05-11 21:56:42 +0900184 if self._must_cancel:
185 # Task is cancelled right before coro stops.
186 self._must_cancel = False
187 self.set_exception(futures.CancelledError())
188 else:
189 self.set_result(exc.value)
Yury Selivanov4145c832016-10-09 12:19:12 -0400190 except futures.CancelledError:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700191 super().cancel() # I.e., Future.cancel(self).
192 except Exception as exc:
193 self.set_exception(exc)
194 except BaseException as exc:
195 self.set_exception(exc)
196 raise
197 else:
Guido van Rossum1140a032016-09-09 12:54:54 -0700198 blocking = getattr(result, '_asyncio_future_blocking', None)
199 if blocking is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700200 # Yielded Future must come from Future.__iter__().
Yury Selivanov0ac3a0c2015-12-11 11:33:59 -0500201 if result._loop is not self._loop:
202 self._loop.call_soon(
203 self._step,
204 RuntimeError(
205 'Task {!r} got Future {!r} attached to a '
206 'different loop'.format(self, result)))
Guido van Rossum1140a032016-09-09 12:54:54 -0700207 elif blocking:
Yury Selivanov4145c832016-10-09 12:19:12 -0400208 if result is self:
209 self._loop.call_soon(
210 self._step,
211 RuntimeError(
212 'Task cannot await on itself: {!r}'.format(
213 self)))
214 else:
215 result._asyncio_future_blocking = False
216 result.add_done_callback(self._wakeup)
217 self._fut_waiter = result
218 if self._must_cancel:
219 if self._fut_waiter.cancel():
220 self._must_cancel = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700221 else:
222 self._loop.call_soon(
Yury Selivanovd59bba82015-11-20 12:41:03 -0500223 self._step,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700224 RuntimeError(
225 'yield was used instead of yield from '
226 'in task {!r} with {!r}'.format(self, result)))
227 elif result is None:
228 # Bare yield relinquishes control for one event loop iteration.
229 self._loop.call_soon(self._step)
230 elif inspect.isgenerator(result):
231 # Yielding a generator is just wrong.
232 self._loop.call_soon(
Yury Selivanovd59bba82015-11-20 12:41:03 -0500233 self._step,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700234 RuntimeError(
235 'yield was used instead of yield from for '
236 'generator in task {!r} with {}'.format(
237 self, result)))
238 else:
239 # Yielding something else is an error.
240 self._loop.call_soon(
Yury Selivanovd59bba82015-11-20 12:41:03 -0500241 self._step,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700242 RuntimeError(
243 'Task got bad yield: {!r}'.format(result)))
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800244 finally:
245 self.__class__._current_tasks.pop(self._loop)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100246 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700247
248 def _wakeup(self, future):
249 try:
Yury Selivanova4afc482015-11-16 15:12:10 -0500250 future.result()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700251 except Exception as exc:
252 # This may also be a cancellation.
Yury Selivanovd59bba82015-11-20 12:41:03 -0500253 self._step(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700254 else:
Yury Selivanova4afc482015-11-16 15:12:10 -0500255 # Don't pass the value of `future.result()` explicitly,
256 # as `Future.__iter__` and `Future.__await__` don't need it.
257 # If we call `_step(value, None)` instead of `_step()`,
258 # Python eval loop would use `.send(value)` method call,
259 # instead of `__next__()`, which is slower for futures
260 # that return non-generator iterators from their `__iter__`.
261 self._step()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700262 self = None # Needed to break cycles when an exception occurs.
263
264
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400265_PyTask = Task
266
267
268try:
269 import _asyncio
270except ImportError:
271 pass
272else:
273 # _CTask is needed for tests.
274 Task = _CTask = _asyncio.Task
275
276
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700277# wait() and as_completed() similar to those in PEP 3148.
278
279FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
280FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
281ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
282
283
284@coroutine
285def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
286 """Wait for the Futures and coroutines given by fs to complete.
287
Victor Stinnerdb74d982014-06-10 11:16:05 +0200288 The sequence futures must not be empty.
289
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700290 Coroutines will be wrapped in Tasks.
291
292 Returns two sets of Future: (done, pending).
293
294 Usage:
295
296 done, pending = yield from asyncio.wait(fs)
297
298 Note: This does not raise TimeoutError! Futures that aren't done
299 when the timeout occurs are returned in the second set.
300 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700301 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100302 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700303 if not fs:
304 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200305 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
306 raise ValueError('Invalid return_when value: {}'.format(return_when))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700307
308 if loop is None:
309 loop = events.get_event_loop()
310
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400311 fs = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700312
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700313 return (yield from _wait(fs, timeout, return_when, loop))
314
315
Victor Stinner59e08022014-08-28 11:19:25 +0200316def _release_waiter(waiter, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700317 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200318 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700319
320
321@coroutine
322def wait_for(fut, timeout, *, loop=None):
323 """Wait for the single Future or coroutine to complete, with timeout.
324
325 Coroutine will be wrapped in Task.
326
Victor Stinner421e49b2014-01-23 17:40:59 +0100327 Returns result of the Future or coroutine. When a timeout occurs,
328 it cancels the task and raises TimeoutError. To avoid the task
329 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700330
Victor Stinner922bc2c2015-01-15 16:29:10 +0100331 If the wait is cancelled, the task is also cancelled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700332
Victor Stinner922bc2c2015-01-15 16:29:10 +0100333 This function is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700334 """
335 if loop is None:
336 loop = events.get_event_loop()
337
Guido van Rossum48c66c32014-01-29 14:30:38 -0800338 if timeout is None:
339 return (yield from fut)
340
Yury Selivanov7661db62016-05-16 15:38:39 -0400341 waiter = loop.create_future()
Victor Stinner59e08022014-08-28 11:19:25 +0200342 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
343 cb = functools.partial(_release_waiter, waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700344
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400345 fut = ensure_future(fut, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700346 fut.add_done_callback(cb)
347
348 try:
Victor Stinner59e08022014-08-28 11:19:25 +0200349 # wait until the future completes or the timeout
Victor Stinner922bc2c2015-01-15 16:29:10 +0100350 try:
351 yield from waiter
352 except futures.CancelledError:
353 fut.remove_done_callback(cb)
354 fut.cancel()
355 raise
Victor Stinner59e08022014-08-28 11:19:25 +0200356
357 if fut.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700358 return fut.result()
359 else:
360 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100361 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700362 raise futures.TimeoutError()
363 finally:
364 timeout_handle.cancel()
365
366
367@coroutine
368def _wait(fs, timeout, return_when, loop):
Victor Stinner2ba8ece2016-04-01 21:39:09 +0200369 """Internal helper for wait() and wait_for().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700370
371 The fs argument must be a collection of Futures.
372 """
373 assert fs, 'Set of Futures is empty.'
Yury Selivanov7661db62016-05-16 15:38:39 -0400374 waiter = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700375 timeout_handle = None
376 if timeout is not None:
377 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
378 counter = len(fs)
379
380 def _on_completion(f):
381 nonlocal counter
382 counter -= 1
383 if (counter <= 0 or
384 return_when == FIRST_COMPLETED or
385 return_when == FIRST_EXCEPTION and (not f.cancelled() and
386 f.exception() is not None)):
387 if timeout_handle is not None:
388 timeout_handle.cancel()
389 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200390 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700391
392 for f in fs:
393 f.add_done_callback(_on_completion)
394
395 try:
396 yield from waiter
397 finally:
398 if timeout_handle is not None:
399 timeout_handle.cancel()
400
401 done, pending = set(), set()
402 for f in fs:
403 f.remove_done_callback(_on_completion)
404 if f.done():
405 done.add(f)
406 else:
407 pending.add(f)
408 return done, pending
409
410
411# This is *not* a @coroutine! It is just an iterator (yielding Futures).
412def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800413 """Return an iterator whose values are coroutines.
414
415 When waiting for the yielded coroutines you'll get the results (or
416 exceptions!) of the original Futures (or coroutines), in the order
417 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700418
419 This differs from PEP 3148; the proper way to use this is:
420
421 for f in as_completed(fs):
422 result = yield from f # The 'yield from' may raise.
423 # Use result.
424
Guido van Rossumb58f0532014-02-12 17:58:19 -0800425 If a timeout is specified, the 'yield from' will raise
426 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700427
428 Note: The futures 'f' are not necessarily members of fs.
429 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700430 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100431 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700432 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400433 todo = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800434 from .queues import Queue # Import here to avoid circular import problem.
435 done = Queue(loop=loop)
436 timeout_handle = None
437
438 def _on_timeout():
439 for f in todo:
440 f.remove_done_callback(_on_completion)
441 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
442 todo.clear() # Can't do todo.remove(f) in the loop.
443
444 def _on_completion(f):
445 if not todo:
446 return # _on_timeout() was here first.
447 todo.remove(f)
448 done.put_nowait(f)
449 if not todo and timeout_handle is not None:
450 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700451
452 @coroutine
453 def _wait_for_one():
Guido van Rossumb58f0532014-02-12 17:58:19 -0800454 f = yield from done.get()
455 if f is None:
456 # Dummy value from _on_timeout().
457 raise futures.TimeoutError
458 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700459
Guido van Rossumb58f0532014-02-12 17:58:19 -0800460 for f in todo:
461 f.add_done_callback(_on_completion)
462 if todo and timeout is not None:
463 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700464 for _ in range(len(todo)):
465 yield _wait_for_one()
466
467
468@coroutine
469def sleep(delay, result=None, *, loop=None):
470 """Coroutine that completes after a given time (in seconds)."""
Yury Selivanovade04122015-11-05 14:29:04 -0500471 if delay == 0:
472 yield
473 return result
474
Yury Selivanov7661db62016-05-16 15:38:39 -0400475 if loop is None:
476 loop = events.get_event_loop()
477 future = loop.create_future()
Victor Stinnera9acbe82014-07-05 15:29:41 +0200478 h = future._loop.call_later(delay,
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500479 futures._set_result_unless_cancelled,
480 future, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700481 try:
482 return (yield from future)
483 finally:
484 h.cancel()
485
486
Yury Selivanov4357cf62016-09-15 13:49:08 -0400487def async_(coro_or_future, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700488 """Wrap a coroutine in a future.
489
490 If the argument is a Future, it is returned directly.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400491
492 This function is deprecated in 3.5. Use asyncio.ensure_future() instead.
493 """
494
495 warnings.warn("asyncio.async() function is deprecated, use ensure_future()",
Mariatta Wijaya4e7ff8b2017-02-06 22:03:00 -0800496 DeprecationWarning,
497 stacklevel=2)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400498
499 return ensure_future(coro_or_future, loop=loop)
500
Yury Selivanov4357cf62016-09-15 13:49:08 -0400501# Silence DeprecationWarning:
502globals()['async'] = async_
503async_.__name__ = 'async'
504del async_
505
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400506
507def ensure_future(coro_or_future, *, loop=None):
Yury Selivanov620279b2015-10-02 15:00:19 -0400508 """Wrap a coroutine or an awaitable in a future.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400509
510 If the argument is a Future, it is returned directly.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700511 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700512 if futures.isfuture(coro_or_future):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700513 if loop is not None and loop is not coro_or_future._loop:
514 raise ValueError('loop argument must agree with Future')
515 return coro_or_future
Victor Stinnerf951d282014-06-29 00:46:45 +0200516 elif coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200517 if loop is None:
518 loop = events.get_event_loop()
519 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200520 if task._source_traceback:
521 del task._source_traceback[-1]
522 return task
Yury Selivanov620279b2015-10-02 15:00:19 -0400523 elif compat.PY35 and inspect.isawaitable(coro_or_future):
524 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700525 else:
Mariattaa3d8dda2017-04-21 19:58:28 -0700526 raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
527 'required')
Yury Selivanov620279b2015-10-02 15:00:19 -0400528
529
530@coroutine
531def _wrap_awaitable(awaitable):
532 """Helper for asyncio.ensure_future().
533
534 Wraps awaitable (an object with __await__) into a coroutine
535 that will later be wrapped in a Task by ensure_future().
536 """
537 return (yield from awaitable.__await__())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700538
539
540class _GatheringFuture(futures.Future):
541 """Helper for gather().
542
543 This overrides cancel() to cancel all the children and act more
544 like Task.cancel(), which doesn't immediately mark itself as
545 cancelled.
546 """
547
548 def __init__(self, children, *, loop=None):
549 super().__init__(loop=loop)
550 self._children = children
551
552 def cancel(self):
553 if self.done():
554 return False
Yury Selivanov3d676152016-10-21 17:22:17 -0400555 ret = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700556 for child in self._children:
Yury Selivanov3d676152016-10-21 17:22:17 -0400557 if child.cancel():
558 ret = True
559 return ret
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700560
561
562def gather(*coros_or_futures, loop=None, return_exceptions=False):
563 """Return a future aggregating results from the given coroutines
564 or futures.
565
Guido van Rossume3c65a72016-09-30 08:17:15 -0700566 Coroutines will be wrapped in a future and scheduled in the event
567 loop. They will not necessarily be scheduled in the same order as
568 passed in.
569
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700570 All futures must share the same event loop. If all the tasks are
571 done successfully, the returned future's result is the list of
572 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500573 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700574 exceptions in the tasks are treated the same as successful
575 results, and gathered in the result list; otherwise, the first
576 raised exception will be immediately propagated to the returned
577 future.
578
579 Cancellation: if the outer Future is cancelled, all children (that
580 have not completed yet) are also cancelled. If any child is
581 cancelled, this is treated as if it raised CancelledError --
582 the outer Future is *not* cancelled in this case. (This is to
583 prevent the cancellation of one child to cause other children to
584 be cancelled.)
585 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200586 if not coros_or_futures:
Yury Selivanov7661db62016-05-16 15:38:39 -0400587 if loop is None:
588 loop = events.get_event_loop()
589 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700590 outer.set_result([])
591 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200592
593 arg_to_fut = {}
594 for arg in set(coros_or_futures):
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700595 if not futures.isfuture(arg):
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400596 fut = ensure_future(arg, loop=loop)
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200597 if loop is None:
598 loop = fut._loop
599 # The caller cannot control this future, the "destroy pending task"
600 # warning should not be emitted.
601 fut._log_destroy_pending = False
602 else:
603 fut = arg
604 if loop is None:
605 loop = fut._loop
606 elif fut._loop is not loop:
607 raise ValueError("futures are tied to different event loops")
608 arg_to_fut[arg] = fut
609
610 children = [arg_to_fut[arg] for arg in coros_or_futures]
611 nchildren = len(children)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700612 outer = _GatheringFuture(children, loop=loop)
613 nfinished = 0
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200614 results = [None] * nchildren
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700615
616 def _done_callback(i, fut):
617 nonlocal nfinished
Victor Stinner3531d902015-01-09 01:42:52 +0100618 if outer.done():
619 if not fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700620 # Mark exception retrieved.
621 fut.exception()
622 return
Victor Stinner3531d902015-01-09 01:42:52 +0100623
Victor Stinner29342622015-01-29 14:15:19 +0100624 if fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700625 res = futures.CancelledError()
626 if not return_exceptions:
627 outer.set_exception(res)
628 return
629 elif fut._exception is not None:
630 res = fut.exception() # Mark exception retrieved.
631 if not return_exceptions:
632 outer.set_exception(res)
633 return
634 else:
635 res = fut._result
636 results[i] = res
637 nfinished += 1
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200638 if nfinished == nchildren:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700639 outer.set_result(results)
640
641 for i, fut in enumerate(children):
642 fut.add_done_callback(functools.partial(_done_callback, i))
643 return outer
644
645
646def shield(arg, *, loop=None):
647 """Wait for a future, shielding it from cancellation.
648
649 The statement
650
651 res = yield from shield(something())
652
653 is exactly equivalent to the statement
654
655 res = yield from something()
656
657 *except* that if the coroutine containing it is cancelled, the
658 task running in something() is not cancelled. From the POV of
659 something(), the cancellation did not happen. But its caller is
660 still cancelled, so the yield-from expression still raises
661 CancelledError. Note: If something() is cancelled by other means
662 this will still cancel shield().
663
664 If you want to completely ignore cancellation (not recommended)
665 you can combine shield() with a try/except clause, as follows:
666
667 try:
668 res = yield from shield(something())
669 except CancelledError:
670 res = None
671 """
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400672 inner = ensure_future(arg, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700673 if inner.done():
674 # Shortcut.
675 return inner
676 loop = inner._loop
Yury Selivanov7661db62016-05-16 15:38:39 -0400677 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700678
679 def _done_callback(inner):
680 if outer.cancelled():
Victor Stinner3531d902015-01-09 01:42:52 +0100681 if not inner.cancelled():
682 # Mark inner's result as retrieved.
683 inner.exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700684 return
Victor Stinner3531d902015-01-09 01:42:52 +0100685
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700686 if inner.cancelled():
687 outer.cancel()
688 else:
689 exc = inner.exception()
690 if exc is not None:
691 outer.set_exception(exc)
692 else:
693 outer.set_result(inner.result())
694
695 inner.add_done_callback(_done_callback)
696 return outer
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700697
698
699def run_coroutine_threadsafe(coro, loop):
700 """Submit a coroutine object to a given event loop.
701
702 Return a concurrent.futures.Future to access the result.
703 """
704 if not coroutines.iscoroutine(coro):
705 raise TypeError('A coroutine object is required')
706 future = concurrent.futures.Future()
707
708 def callback():
Guido van Rossum601953b2015-10-05 16:20:00 -0700709 try:
710 futures._chain_future(ensure_future(coro, loop=loop), future)
711 except Exception as exc:
712 if future.set_running_or_notify_cancel():
713 future.set_exception(exc)
714 raise
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700715
716 loop.call_soon_threadsafe(callback)
717 return future