blob: c23d06afd7c33b5f009574f433d16dff566bcc97 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Victor Stinnerf951d282014-06-29 00:46:45 +02003__all__ = ['Task',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
5 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
Guido van Rossum841d9ee2015-10-03 08:31:42 -07006 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007 ]
8
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009import concurrent.futures
10import functools
11import inspect
Andrew Svetlov5f841b52017-12-09 00:23:48 +020012import types
Yury Selivanov59eb9a42015-05-11 14:48:38 -040013import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014import weakref
15
Yury Selivanova0c1ba62016-10-28 12:52:37 -040016from . import base_tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020017from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import events
19from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020020from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023class Task(futures.Future):
24 """A coroutine wrapped in a Future."""
25
26 # An important invariant maintained while a Task not done:
27 #
28 # - Either _fut_waiter is None, and _step() is scheduled;
29 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
30 #
31 # The only transition from the latter to the former is through
32 # _wakeup(). When _fut_waiter is not None, one of its callbacks
33 # must be _wakeup().
34
35 # Weak set containing all tasks alive.
36 _all_tasks = weakref.WeakSet()
37
Guido van Rossum1a605ed2013-12-06 12:57:40 -080038 # Dictionary containing tasks that are currently active in
39 # all running event loops. {EventLoop: Task}
40 _current_tasks = {}
41
Victor Stinnerfe22e092014-12-04 23:00:13 +010042 # If False, don't log a message if the task is destroyed whereas its
43 # status is still pending
44 _log_destroy_pending = True
45
Guido van Rossum1a605ed2013-12-06 12:57:40 -080046 @classmethod
47 def current_task(cls, loop=None):
48 """Return the currently running task in an event loop or None.
49
50 By default the current task for the current event loop is returned.
51
52 None is returned when called not in the context of a Task.
53 """
54 if loop is None:
55 loop = events.get_event_loop()
56 return cls._current_tasks.get(loop)
57
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070058 @classmethod
59 def all_tasks(cls, loop=None):
60 """Return a set of all tasks for an event loop.
61
62 By default all tasks for the current event loop are returned.
63 """
64 if loop is None:
65 loop = events.get_event_loop()
66 return {t for t in cls._all_tasks if t._loop is loop}
67
68 def __init__(self, coro, *, loop=None):
Victor Stinner15cc6782015-01-09 00:09:10 +010069 assert coroutines.iscoroutine(coro), repr(coro)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070070 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +020071 if self._source_traceback:
72 del self._source_traceback[-1]
Yury Selivanov1ad08a52015-05-28 10:52:19 -040073 self._coro = coro
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074 self._fut_waiter = None
75 self._must_cancel = False
76 self._loop.call_soon(self._step)
77 self.__class__._all_tasks.add(self)
78
INADA Naoki3e2ad8e2017-04-25 10:57:18 +090079 def __del__(self):
80 if self._state == futures._PENDING and self._log_destroy_pending:
81 context = {
82 'task': self,
83 'message': 'Task was destroyed but it is pending!',
84 }
85 if self._source_traceback:
86 context['source_traceback'] = self._source_traceback
87 self._loop.call_exception_handler(context)
88 futures.Future.__del__(self)
Victor Stinnera02f81f2014-06-24 22:37:53 +020089
Victor Stinner313a9802014-07-29 12:58:23 +020090 def _repr_info(self):
Yury Selivanova0c1ba62016-10-28 12:52:37 -040091 return base_tasks._task_repr_info(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070092
93 def get_stack(self, *, limit=None):
94 """Return the list of stack frames for this task's coroutine.
95
Victor Stinnerd87de832014-12-02 17:57:04 +010096 If the coroutine is not done, this returns the stack where it is
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070097 suspended. If the coroutine has completed successfully or was
98 cancelled, this returns an empty list. If the coroutine was
99 terminated by an exception, this returns the list of traceback
100 frames.
101
102 The frames are always ordered from oldest to newest.
103
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500104 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700105 return; by default all available frames are returned. Its
106 meaning differs depending on whether a stack or a traceback is
107 returned: the newest frames of a stack are returned, but the
108 oldest frames of a traceback are returned. (This matches the
109 behavior of the traceback module.)
110
111 For reasons beyond our control, only one stack frame is
112 returned for a suspended coroutine.
113 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400114 return base_tasks._task_get_stack(self, limit)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700115
116 def print_stack(self, *, limit=None, file=None):
117 """Print the stack or traceback for this task's coroutine.
118
119 This produces output similar to that of the traceback module,
120 for the frames retrieved by get_stack(). The limit argument
121 is passed to get_stack(). The file argument is an I/O stream
R David Murray8e069d52014-09-24 13:13:45 -0400122 to which the output is written; by default output is written
123 to sys.stderr.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700124 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400125 return base_tasks._task_print_stack(self, limit, file)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700126
127 def cancel(self):
R David Murray8e069d52014-09-24 13:13:45 -0400128 """Request that this task cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200129
Victor Stinner8d213572014-06-02 23:06:46 +0200130 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200131 wrapped coroutine on the next cycle through the event loop.
132 The coroutine then has a chance to clean up or even deny
133 the request using try/except/finally.
134
R David Murray8e069d52014-09-24 13:13:45 -0400135 Unlike Future.cancel, this does not guarantee that the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200136 task will be cancelled: the exception might be caught and
R David Murray8e069d52014-09-24 13:13:45 -0400137 acted upon, delaying cancellation of the task or preventing
138 cancellation completely. The task may also return a value or
139 raise a different exception.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200140
141 Immediately after this method is called, Task.cancelled() will
142 not return True (unless the task was already cancelled). A
143 task will be marked as cancelled when the wrapped coroutine
144 terminates with a CancelledError exception (even if cancel()
145 was not called).
146 """
Yury Selivanov7ce1c6f2017-06-11 13:49:18 +0000147 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700148 if self.done():
149 return False
150 if self._fut_waiter is not None:
151 if self._fut_waiter.cancel():
152 # Leave self._fut_waiter; it may be a Task that
153 # catches and ignores the cancellation so we may have
154 # to cancel it again later.
155 return True
156 # It must be the case that self._step is already scheduled.
157 self._must_cancel = True
158 return True
159
Yury Selivanovd59bba82015-11-20 12:41:03 -0500160 def _step(self, exc=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700161 assert not self.done(), \
Yury Selivanovd59bba82015-11-20 12:41:03 -0500162 '_step(): already done: {!r}, {!r}'.format(self, exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700163 if self._must_cancel:
164 if not isinstance(exc, futures.CancelledError):
165 exc = futures.CancelledError()
166 self._must_cancel = False
167 coro = self._coro
168 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800169
170 self.__class__._current_tasks[self._loop] = self
Yury Selivanovd59bba82015-11-20 12:41:03 -0500171 # Call either coro.throw(exc) or coro.send(None).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700172 try:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500173 if exc is None:
174 # We use the `send` method directly, because coroutines
175 # don't have `__iter__` and `__next__` methods.
176 result = coro.send(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700177 else:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500178 result = coro.throw(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700179 except StopIteration as exc:
INADA Naoki991adca2017-05-11 21:18:38 +0900180 if self._must_cancel:
181 # Task is cancelled right before coro stops.
182 self._must_cancel = False
183 self.set_exception(futures.CancelledError())
184 else:
185 self.set_result(exc.value)
Yury Selivanov4145c832016-10-09 12:19:12 -0400186 except futures.CancelledError:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700187 super().cancel() # I.e., Future.cancel(self).
188 except Exception as exc:
189 self.set_exception(exc)
190 except BaseException as exc:
191 self.set_exception(exc)
192 raise
193 else:
Guido van Rossum1140a032016-09-09 12:54:54 -0700194 blocking = getattr(result, '_asyncio_future_blocking', None)
195 if blocking is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700196 # Yielded Future must come from Future.__iter__().
Yury Selivanov0ac3a0c2015-12-11 11:33:59 -0500197 if result._loop is not self._loop:
198 self._loop.call_soon(
199 self._step,
200 RuntimeError(
201 'Task {!r} got Future {!r} attached to a '
202 'different loop'.format(self, result)))
Guido van Rossum1140a032016-09-09 12:54:54 -0700203 elif blocking:
Yury Selivanov4145c832016-10-09 12:19:12 -0400204 if result is self:
205 self._loop.call_soon(
206 self._step,
207 RuntimeError(
208 'Task cannot await on itself: {!r}'.format(
209 self)))
210 else:
211 result._asyncio_future_blocking = False
212 result.add_done_callback(self._wakeup)
213 self._fut_waiter = result
214 if self._must_cancel:
215 if self._fut_waiter.cancel():
216 self._must_cancel = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700217 else:
218 self._loop.call_soon(
Yury Selivanovd59bba82015-11-20 12:41:03 -0500219 self._step,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700220 RuntimeError(
221 'yield was used instead of yield from '
222 'in task {!r} with {!r}'.format(self, result)))
223 elif result is None:
224 # Bare yield relinquishes control for one event loop iteration.
225 self._loop.call_soon(self._step)
226 elif inspect.isgenerator(result):
227 # Yielding a generator is just wrong.
228 self._loop.call_soon(
Yury Selivanovd59bba82015-11-20 12:41:03 -0500229 self._step,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700230 RuntimeError(
231 'yield was used instead of yield from for '
232 'generator in task {!r} with {}'.format(
233 self, result)))
234 else:
235 # Yielding something else is an error.
236 self._loop.call_soon(
Yury Selivanovd59bba82015-11-20 12:41:03 -0500237 self._step,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700238 RuntimeError(
239 'Task got bad yield: {!r}'.format(result)))
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800240 finally:
241 self.__class__._current_tasks.pop(self._loop)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100242 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700243
244 def _wakeup(self, future):
245 try:
Yury Selivanova4afc482015-11-16 15:12:10 -0500246 future.result()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700247 except Exception as exc:
248 # This may also be a cancellation.
Yury Selivanovd59bba82015-11-20 12:41:03 -0500249 self._step(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700250 else:
Yury Selivanova4afc482015-11-16 15:12:10 -0500251 # Don't pass the value of `future.result()` explicitly,
252 # as `Future.__iter__` and `Future.__await__` don't need it.
253 # If we call `_step(value, None)` instead of `_step()`,
254 # Python eval loop would use `.send(value)` method call,
255 # instead of `__next__()`, which is slower for futures
256 # that return non-generator iterators from their `__iter__`.
257 self._step()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700258 self = None # Needed to break cycles when an exception occurs.
259
260
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400261_PyTask = Task
262
263
264try:
265 import _asyncio
266except ImportError:
267 pass
268else:
269 # _CTask is needed for tests.
270 Task = _CTask = _asyncio.Task
271
272
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700273# wait() and as_completed() similar to those in PEP 3148.
274
275FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
276FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
277ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
278
279
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200280async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700281 """Wait for the Futures and coroutines given by fs to complete.
282
Victor Stinnerdb74d982014-06-10 11:16:05 +0200283 The sequence futures must not be empty.
284
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700285 Coroutines will be wrapped in Tasks.
286
287 Returns two sets of Future: (done, pending).
288
289 Usage:
290
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200291 done, pending = await asyncio.wait(fs)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700292
293 Note: This does not raise TimeoutError! Futures that aren't done
294 when the timeout occurs are returned in the second set.
295 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700296 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100297 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700298 if not fs:
299 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200300 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
301 raise ValueError('Invalid return_when value: {}'.format(return_when))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700302
303 if loop is None:
304 loop = events.get_event_loop()
305
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400306 fs = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700307
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200308 return await _wait(fs, timeout, return_when, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700309
310
Victor Stinner59e08022014-08-28 11:19:25 +0200311def _release_waiter(waiter, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700312 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200313 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700314
315
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200316async def wait_for(fut, timeout, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700317 """Wait for the single Future or coroutine to complete, with timeout.
318
319 Coroutine will be wrapped in Task.
320
Victor Stinner421e49b2014-01-23 17:40:59 +0100321 Returns result of the Future or coroutine. When a timeout occurs,
322 it cancels the task and raises TimeoutError. To avoid the task
323 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700324
Victor Stinner922bc2c2015-01-15 16:29:10 +0100325 If the wait is cancelled, the task is also cancelled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700326
Victor Stinner922bc2c2015-01-15 16:29:10 +0100327 This function is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700328 """
329 if loop is None:
330 loop = events.get_event_loop()
331
Guido van Rossum48c66c32014-01-29 14:30:38 -0800332 if timeout is None:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200333 return await fut
Guido van Rossum48c66c32014-01-29 14:30:38 -0800334
Victor K4d071892017-10-05 19:04:39 +0300335 if timeout <= 0:
336 fut = ensure_future(fut, loop=loop)
337
338 if fut.done():
339 return fut.result()
340
341 fut.cancel()
342 raise futures.TimeoutError()
343
Yury Selivanov7661db62016-05-16 15:38:39 -0400344 waiter = loop.create_future()
Victor Stinner59e08022014-08-28 11:19:25 +0200345 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
346 cb = functools.partial(_release_waiter, waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700347
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400348 fut = ensure_future(fut, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700349 fut.add_done_callback(cb)
350
351 try:
Victor Stinner59e08022014-08-28 11:19:25 +0200352 # wait until the future completes or the timeout
Victor Stinner922bc2c2015-01-15 16:29:10 +0100353 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200354 await waiter
Victor Stinner922bc2c2015-01-15 16:29:10 +0100355 except futures.CancelledError:
356 fut.remove_done_callback(cb)
357 fut.cancel()
358 raise
Victor Stinner59e08022014-08-28 11:19:25 +0200359
360 if fut.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700361 return fut.result()
362 else:
363 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100364 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700365 raise futures.TimeoutError()
366 finally:
367 timeout_handle.cancel()
368
369
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200370async def _wait(fs, timeout, return_when, loop):
Victor Stinner2ba8ece2016-04-01 21:39:09 +0200371 """Internal helper for wait() and wait_for().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700372
373 The fs argument must be a collection of Futures.
374 """
375 assert fs, 'Set of Futures is empty.'
Yury Selivanov7661db62016-05-16 15:38:39 -0400376 waiter = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700377 timeout_handle = None
378 if timeout is not None:
379 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
380 counter = len(fs)
381
382 def _on_completion(f):
383 nonlocal counter
384 counter -= 1
385 if (counter <= 0 or
386 return_when == FIRST_COMPLETED or
387 return_when == FIRST_EXCEPTION and (not f.cancelled() and
388 f.exception() is not None)):
389 if timeout_handle is not None:
390 timeout_handle.cancel()
391 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200392 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700393
394 for f in fs:
395 f.add_done_callback(_on_completion)
396
397 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200398 await waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700399 finally:
400 if timeout_handle is not None:
401 timeout_handle.cancel()
402
403 done, pending = set(), set()
404 for f in fs:
405 f.remove_done_callback(_on_completion)
406 if f.done():
407 done.add(f)
408 else:
409 pending.add(f)
410 return done, pending
411
412
413# This is *not* a @coroutine! It is just an iterator (yielding Futures).
414def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800415 """Return an iterator whose values are coroutines.
416
417 When waiting for the yielded coroutines you'll get the results (or
418 exceptions!) of the original Futures (or coroutines), in the order
419 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700420
421 This differs from PEP 3148; the proper way to use this is:
422
423 for f in as_completed(fs):
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200424 result = await f # The 'await' may raise.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700425 # Use result.
426
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200427 If a timeout is specified, the 'await' will raise
Guido van Rossumb58f0532014-02-12 17:58:19 -0800428 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700429
430 Note: The futures 'f' are not necessarily members of fs.
431 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700432 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100433 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700434 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400435 todo = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800436 from .queues import Queue # Import here to avoid circular import problem.
437 done = Queue(loop=loop)
438 timeout_handle = None
439
440 def _on_timeout():
441 for f in todo:
442 f.remove_done_callback(_on_completion)
443 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
444 todo.clear() # Can't do todo.remove(f) in the loop.
445
446 def _on_completion(f):
447 if not todo:
448 return # _on_timeout() was here first.
449 todo.remove(f)
450 done.put_nowait(f)
451 if not todo and timeout_handle is not None:
452 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700453
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200454 async def _wait_for_one():
455 f = await done.get()
Guido van Rossumb58f0532014-02-12 17:58:19 -0800456 if f is None:
457 # Dummy value from _on_timeout().
458 raise futures.TimeoutError
459 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700460
Guido van Rossumb58f0532014-02-12 17:58:19 -0800461 for f in todo:
462 f.add_done_callback(_on_completion)
463 if todo and timeout is not None:
464 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700465 for _ in range(len(todo)):
466 yield _wait_for_one()
467
468
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200469@types.coroutine
470def __sleep0():
471 """Skip one event loop run cycle.
472
473 This is a private helper for 'asyncio.sleep()', used
474 when the 'delay' is set to 0. It uses a bare 'yield'
475 expression (which Task._step knows how to handle)
476 instead of creating a Future object.
477 """
478 yield
479
480
481async def sleep(delay, result=None, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700482 """Coroutine that completes after a given time (in seconds)."""
Yury Selivanovade04122015-11-05 14:29:04 -0500483 if delay == 0:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200484 await __sleep0()
Yury Selivanovade04122015-11-05 14:29:04 -0500485 return result
486
Yury Selivanov7661db62016-05-16 15:38:39 -0400487 if loop is None:
488 loop = events.get_event_loop()
489 future = loop.create_future()
Victor Stinnera9acbe82014-07-05 15:29:41 +0200490 h = future._loop.call_later(delay,
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500491 futures._set_result_unless_cancelled,
492 future, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700493 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200494 return await future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700495 finally:
496 h.cancel()
497
498
Yury Selivanov4357cf62016-09-15 13:49:08 -0400499def async_(coro_or_future, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700500 """Wrap a coroutine in a future.
501
502 If the argument is a Future, it is returned directly.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400503
504 This function is deprecated in 3.5. Use asyncio.ensure_future() instead.
505 """
506
507 warnings.warn("asyncio.async() function is deprecated, use ensure_future()",
Mariatta Wijaya4e7ff8b2017-02-06 22:03:00 -0800508 DeprecationWarning,
509 stacklevel=2)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400510
511 return ensure_future(coro_or_future, loop=loop)
512
Yury Selivanov4357cf62016-09-15 13:49:08 -0400513# Silence DeprecationWarning:
514globals()['async'] = async_
515async_.__name__ = 'async'
516del async_
517
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400518
519def ensure_future(coro_or_future, *, loop=None):
Yury Selivanov620279b2015-10-02 15:00:19 -0400520 """Wrap a coroutine or an awaitable in a future.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400521
522 If the argument is a Future, it is returned directly.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700523 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700524 if futures.isfuture(coro_or_future):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700525 if loop is not None and loop is not coro_or_future._loop:
526 raise ValueError('loop argument must agree with Future')
527 return coro_or_future
Victor Stinnerf951d282014-06-29 00:46:45 +0200528 elif coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200529 if loop is None:
530 loop = events.get_event_loop()
531 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200532 if task._source_traceback:
533 del task._source_traceback[-1]
534 return task
Victor Stinner3f438a92017-11-28 14:43:52 +0100535 elif inspect.isawaitable(coro_or_future):
Yury Selivanov620279b2015-10-02 15:00:19 -0400536 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700537 else:
Charles Renwickae5b3262017-04-21 16:49:48 -0400538 raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
539 'required')
Yury Selivanov620279b2015-10-02 15:00:19 -0400540
541
542@coroutine
543def _wrap_awaitable(awaitable):
544 """Helper for asyncio.ensure_future().
545
546 Wraps awaitable (an object with __await__) into a coroutine
547 that will later be wrapped in a Task by ensure_future().
548 """
549 return (yield from awaitable.__await__())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700550
551
552class _GatheringFuture(futures.Future):
553 """Helper for gather().
554
555 This overrides cancel() to cancel all the children and act more
556 like Task.cancel(), which doesn't immediately mark itself as
557 cancelled.
558 """
559
560 def __init__(self, children, *, loop=None):
561 super().__init__(loop=loop)
562 self._children = children
563
564 def cancel(self):
565 if self.done():
566 return False
Yury Selivanov3d676152016-10-21 17:22:17 -0400567 ret = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700568 for child in self._children:
Yury Selivanov3d676152016-10-21 17:22:17 -0400569 if child.cancel():
570 ret = True
571 return ret
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700572
573
574def gather(*coros_or_futures, loop=None, return_exceptions=False):
575 """Return a future aggregating results from the given coroutines
576 or futures.
577
Guido van Rossume3c65a72016-09-30 08:17:15 -0700578 Coroutines will be wrapped in a future and scheduled in the event
579 loop. They will not necessarily be scheduled in the same order as
580 passed in.
581
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700582 All futures must share the same event loop. If all the tasks are
583 done successfully, the returned future's result is the list of
584 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500585 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700586 exceptions in the tasks are treated the same as successful
587 results, and gathered in the result list; otherwise, the first
588 raised exception will be immediately propagated to the returned
589 future.
590
591 Cancellation: if the outer Future is cancelled, all children (that
592 have not completed yet) are also cancelled. If any child is
593 cancelled, this is treated as if it raised CancelledError --
594 the outer Future is *not* cancelled in this case. (This is to
595 prevent the cancellation of one child to cause other children to
596 be cancelled.)
597 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200598 if not coros_or_futures:
Yury Selivanov7661db62016-05-16 15:38:39 -0400599 if loop is None:
600 loop = events.get_event_loop()
601 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700602 outer.set_result([])
603 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200604
605 arg_to_fut = {}
606 for arg in set(coros_or_futures):
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700607 if not futures.isfuture(arg):
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400608 fut = ensure_future(arg, loop=loop)
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200609 if loop is None:
610 loop = fut._loop
611 # The caller cannot control this future, the "destroy pending task"
612 # warning should not be emitted.
613 fut._log_destroy_pending = False
614 else:
615 fut = arg
616 if loop is None:
617 loop = fut._loop
618 elif fut._loop is not loop:
619 raise ValueError("futures are tied to different event loops")
620 arg_to_fut[arg] = fut
621
622 children = [arg_to_fut[arg] for arg in coros_or_futures]
623 nchildren = len(children)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700624 outer = _GatheringFuture(children, loop=loop)
625 nfinished = 0
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200626 results = [None] * nchildren
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700627
628 def _done_callback(i, fut):
629 nonlocal nfinished
Victor Stinner3531d902015-01-09 01:42:52 +0100630 if outer.done():
631 if not fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700632 # Mark exception retrieved.
633 fut.exception()
634 return
Victor Stinner3531d902015-01-09 01:42:52 +0100635
Victor Stinner29342622015-01-29 14:15:19 +0100636 if fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700637 res = futures.CancelledError()
638 if not return_exceptions:
639 outer.set_exception(res)
640 return
641 elif fut._exception is not None:
642 res = fut.exception() # Mark exception retrieved.
643 if not return_exceptions:
644 outer.set_exception(res)
645 return
646 else:
647 res = fut._result
648 results[i] = res
649 nfinished += 1
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200650 if nfinished == nchildren:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700651 outer.set_result(results)
652
653 for i, fut in enumerate(children):
654 fut.add_done_callback(functools.partial(_done_callback, i))
655 return outer
656
657
658def shield(arg, *, loop=None):
659 """Wait for a future, shielding it from cancellation.
660
661 The statement
662
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200663 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700664
665 is exactly equivalent to the statement
666
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200667 res = await something()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700668
669 *except* that if the coroutine containing it is cancelled, the
670 task running in something() is not cancelled. From the POV of
671 something(), the cancellation did not happen. But its caller is
672 still cancelled, so the yield-from expression still raises
673 CancelledError. Note: If something() is cancelled by other means
674 this will still cancel shield().
675
676 If you want to completely ignore cancellation (not recommended)
677 you can combine shield() with a try/except clause, as follows:
678
679 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200680 res = await shield(something())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700681 except CancelledError:
682 res = None
683 """
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400684 inner = ensure_future(arg, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700685 if inner.done():
686 # Shortcut.
687 return inner
688 loop = inner._loop
Yury Selivanov7661db62016-05-16 15:38:39 -0400689 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700690
691 def _done_callback(inner):
692 if outer.cancelled():
Victor Stinner3531d902015-01-09 01:42:52 +0100693 if not inner.cancelled():
694 # Mark inner's result as retrieved.
695 inner.exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700696 return
Victor Stinner3531d902015-01-09 01:42:52 +0100697
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700698 if inner.cancelled():
699 outer.cancel()
700 else:
701 exc = inner.exception()
702 if exc is not None:
703 outer.set_exception(exc)
704 else:
705 outer.set_result(inner.result())
706
707 inner.add_done_callback(_done_callback)
708 return outer
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700709
710
711def run_coroutine_threadsafe(coro, loop):
712 """Submit a coroutine object to a given event loop.
713
714 Return a concurrent.futures.Future to access the result.
715 """
716 if not coroutines.iscoroutine(coro):
717 raise TypeError('A coroutine object is required')
718 future = concurrent.futures.Future()
719
720 def callback():
Guido van Rossum601953b2015-10-05 16:20:00 -0700721 try:
722 futures._chain_future(ensure_future(coro, loop=loop), future)
723 except Exception as exc:
724 if future.set_running_or_notify_cancel():
725 future.set_exception(exc)
726 raise
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700727
728 loop.call_soon_threadsafe(callback)
729 return future