blob: 5d744c3d30eea8fbb65d91ed5cff157bd14f088b [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Victor Stinnerf951d282014-06-29 00:46:45 +02003__all__ = ['Task',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
5 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
Guido van Rossum841d9ee2015-10-03 08:31:42 -07006 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007 ]
8
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009import concurrent.futures
10import functools
11import inspect
Yury Selivanov59eb9a42015-05-11 14:48:38 -040012import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070013import weakref
14
Yury Selivanova0c1ba62016-10-28 12:52:37 -040015from . import base_tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020016from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017from . import events
18from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020019from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022class Task(futures.Future):
23 """A coroutine wrapped in a Future."""
24
25 # An important invariant maintained while a Task not done:
26 #
27 # - Either _fut_waiter is None, and _step() is scheduled;
28 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
29 #
30 # The only transition from the latter to the former is through
31 # _wakeup(). When _fut_waiter is not None, one of its callbacks
32 # must be _wakeup().
33
34 # Weak set containing all tasks alive.
35 _all_tasks = weakref.WeakSet()
36
Guido van Rossum1a605ed2013-12-06 12:57:40 -080037 # Dictionary containing tasks that are currently active in
38 # all running event loops. {EventLoop: Task}
39 _current_tasks = {}
40
Victor Stinnerfe22e092014-12-04 23:00:13 +010041 # If False, don't log a message if the task is destroyed whereas its
42 # status is still pending
43 _log_destroy_pending = True
44
Guido van Rossum1a605ed2013-12-06 12:57:40 -080045 @classmethod
46 def current_task(cls, loop=None):
47 """Return the currently running task in an event loop or None.
48
49 By default the current task for the current event loop is returned.
50
51 None is returned when called not in the context of a Task.
52 """
53 if loop is None:
54 loop = events.get_event_loop()
55 return cls._current_tasks.get(loop)
56
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070057 @classmethod
58 def all_tasks(cls, loop=None):
59 """Return a set of all tasks for an event loop.
60
61 By default all tasks for the current event loop are returned.
62 """
63 if loop is None:
64 loop = events.get_event_loop()
65 return {t for t in cls._all_tasks if t._loop is loop}
66
67 def __init__(self, coro, *, loop=None):
Victor Stinner15cc6782015-01-09 00:09:10 +010068 assert coroutines.iscoroutine(coro), repr(coro)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070069 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +020070 if self._source_traceback:
71 del self._source_traceback[-1]
Yury Selivanov1ad08a52015-05-28 10:52:19 -040072 self._coro = coro
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070073 self._fut_waiter = None
74 self._must_cancel = False
75 self._loop.call_soon(self._step)
76 self.__class__._all_tasks.add(self)
77
INADA Naoki3e2ad8e2017-04-25 10:57:18 +090078 def __del__(self):
79 if self._state == futures._PENDING and self._log_destroy_pending:
80 context = {
81 'task': self,
82 'message': 'Task was destroyed but it is pending!',
83 }
84 if self._source_traceback:
85 context['source_traceback'] = self._source_traceback
86 self._loop.call_exception_handler(context)
87 futures.Future.__del__(self)
Victor Stinnera02f81f2014-06-24 22:37:53 +020088
Victor Stinner313a9802014-07-29 12:58:23 +020089 def _repr_info(self):
Yury Selivanova0c1ba62016-10-28 12:52:37 -040090 return base_tasks._task_repr_info(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070091
92 def get_stack(self, *, limit=None):
93 """Return the list of stack frames for this task's coroutine.
94
Victor Stinnerd87de832014-12-02 17:57:04 +010095 If the coroutine is not done, this returns the stack where it is
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070096 suspended. If the coroutine has completed successfully or was
97 cancelled, this returns an empty list. If the coroutine was
98 terminated by an exception, this returns the list of traceback
99 frames.
100
101 The frames are always ordered from oldest to newest.
102
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500103 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700104 return; by default all available frames are returned. Its
105 meaning differs depending on whether a stack or a traceback is
106 returned: the newest frames of a stack are returned, but the
107 oldest frames of a traceback are returned. (This matches the
108 behavior of the traceback module.)
109
110 For reasons beyond our control, only one stack frame is
111 returned for a suspended coroutine.
112 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400113 return base_tasks._task_get_stack(self, limit)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700114
115 def print_stack(self, *, limit=None, file=None):
116 """Print the stack or traceback for this task's coroutine.
117
118 This produces output similar to that of the traceback module,
119 for the frames retrieved by get_stack(). The limit argument
120 is passed to get_stack(). The file argument is an I/O stream
R David Murray8e069d52014-09-24 13:13:45 -0400121 to which the output is written; by default output is written
122 to sys.stderr.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700123 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400124 return base_tasks._task_print_stack(self, limit, file)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700125
126 def cancel(self):
R David Murray8e069d52014-09-24 13:13:45 -0400127 """Request that this task cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200128
Victor Stinner8d213572014-06-02 23:06:46 +0200129 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200130 wrapped coroutine on the next cycle through the event loop.
131 The coroutine then has a chance to clean up or even deny
132 the request using try/except/finally.
133
R David Murray8e069d52014-09-24 13:13:45 -0400134 Unlike Future.cancel, this does not guarantee that the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200135 task will be cancelled: the exception might be caught and
R David Murray8e069d52014-09-24 13:13:45 -0400136 acted upon, delaying cancellation of the task or preventing
137 cancellation completely. The task may also return a value or
138 raise a different exception.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200139
140 Immediately after this method is called, Task.cancelled() will
141 not return True (unless the task was already cancelled). A
142 task will be marked as cancelled when the wrapped coroutine
143 terminates with a CancelledError exception (even if cancel()
144 was not called).
145 """
Yury Selivanov7ce1c6f2017-06-11 13:49:18 +0000146 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700147 if self.done():
148 return False
149 if self._fut_waiter is not None:
150 if self._fut_waiter.cancel():
151 # Leave self._fut_waiter; it may be a Task that
152 # catches and ignores the cancellation so we may have
153 # to cancel it again later.
154 return True
155 # It must be the case that self._step is already scheduled.
156 self._must_cancel = True
157 return True
158
Yury Selivanovd59bba82015-11-20 12:41:03 -0500159 def _step(self, exc=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700160 assert not self.done(), \
Yury Selivanovd59bba82015-11-20 12:41:03 -0500161 '_step(): already done: {!r}, {!r}'.format(self, exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700162 if self._must_cancel:
163 if not isinstance(exc, futures.CancelledError):
164 exc = futures.CancelledError()
165 self._must_cancel = False
166 coro = self._coro
167 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800168
169 self.__class__._current_tasks[self._loop] = self
Yury Selivanovd59bba82015-11-20 12:41:03 -0500170 # Call either coro.throw(exc) or coro.send(None).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700171 try:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500172 if exc is None:
173 # We use the `send` method directly, because coroutines
174 # don't have `__iter__` and `__next__` methods.
175 result = coro.send(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700176 else:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500177 result = coro.throw(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700178 except StopIteration as exc:
INADA Naoki991adca2017-05-11 21:18:38 +0900179 if self._must_cancel:
180 # Task is cancelled right before coro stops.
181 self._must_cancel = False
182 self.set_exception(futures.CancelledError())
183 else:
184 self.set_result(exc.value)
Yury Selivanov4145c832016-10-09 12:19:12 -0400185 except futures.CancelledError:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700186 super().cancel() # I.e., Future.cancel(self).
187 except Exception as exc:
188 self.set_exception(exc)
189 except BaseException as exc:
190 self.set_exception(exc)
191 raise
192 else:
Guido van Rossum1140a032016-09-09 12:54:54 -0700193 blocking = getattr(result, '_asyncio_future_blocking', None)
194 if blocking is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700195 # Yielded Future must come from Future.__iter__().
Yury Selivanov0ac3a0c2015-12-11 11:33:59 -0500196 if result._loop is not self._loop:
197 self._loop.call_soon(
198 self._step,
199 RuntimeError(
200 'Task {!r} got Future {!r} attached to a '
201 'different loop'.format(self, result)))
Guido van Rossum1140a032016-09-09 12:54:54 -0700202 elif blocking:
Yury Selivanov4145c832016-10-09 12:19:12 -0400203 if result is self:
204 self._loop.call_soon(
205 self._step,
206 RuntimeError(
207 'Task cannot await on itself: {!r}'.format(
208 self)))
209 else:
210 result._asyncio_future_blocking = False
211 result.add_done_callback(self._wakeup)
212 self._fut_waiter = result
213 if self._must_cancel:
214 if self._fut_waiter.cancel():
215 self._must_cancel = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700216 else:
217 self._loop.call_soon(
Yury Selivanovd59bba82015-11-20 12:41:03 -0500218 self._step,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700219 RuntimeError(
220 'yield was used instead of yield from '
221 'in task {!r} with {!r}'.format(self, result)))
222 elif result is None:
223 # Bare yield relinquishes control for one event loop iteration.
224 self._loop.call_soon(self._step)
225 elif inspect.isgenerator(result):
226 # Yielding a generator is just wrong.
227 self._loop.call_soon(
Yury Selivanovd59bba82015-11-20 12:41:03 -0500228 self._step,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700229 RuntimeError(
230 'yield was used instead of yield from for '
231 'generator in task {!r} with {}'.format(
232 self, result)))
233 else:
234 # Yielding something else is an error.
235 self._loop.call_soon(
Yury Selivanovd59bba82015-11-20 12:41:03 -0500236 self._step,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700237 RuntimeError(
238 'Task got bad yield: {!r}'.format(result)))
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800239 finally:
240 self.__class__._current_tasks.pop(self._loop)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100241 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700242
243 def _wakeup(self, future):
244 try:
Yury Selivanova4afc482015-11-16 15:12:10 -0500245 future.result()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700246 except Exception as exc:
247 # This may also be a cancellation.
Yury Selivanovd59bba82015-11-20 12:41:03 -0500248 self._step(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700249 else:
Yury Selivanova4afc482015-11-16 15:12:10 -0500250 # Don't pass the value of `future.result()` explicitly,
251 # as `Future.__iter__` and `Future.__await__` don't need it.
252 # If we call `_step(value, None)` instead of `_step()`,
253 # Python eval loop would use `.send(value)` method call,
254 # instead of `__next__()`, which is slower for futures
255 # that return non-generator iterators from their `__iter__`.
256 self._step()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700257 self = None # Needed to break cycles when an exception occurs.
258
259
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400260_PyTask = Task
261
262
263try:
264 import _asyncio
265except ImportError:
266 pass
267else:
268 # _CTask is needed for tests.
269 Task = _CTask = _asyncio.Task
270
271
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700272# wait() and as_completed() similar to those in PEP 3148.
273
274FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
275FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
276ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
277
278
279@coroutine
280def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
281 """Wait for the Futures and coroutines given by fs to complete.
282
Victor Stinnerdb74d982014-06-10 11:16:05 +0200283 The sequence futures must not be empty.
284
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700285 Coroutines will be wrapped in Tasks.
286
287 Returns two sets of Future: (done, pending).
288
289 Usage:
290
291 done, pending = yield from asyncio.wait(fs)
292
293 Note: This does not raise TimeoutError! Futures that aren't done
294 when the timeout occurs are returned in the second set.
295 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700296 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100297 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700298 if not fs:
299 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200300 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
301 raise ValueError('Invalid return_when value: {}'.format(return_when))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700302
303 if loop is None:
304 loop = events.get_event_loop()
305
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400306 fs = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700307
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700308 return (yield from _wait(fs, timeout, return_when, loop))
309
310
Victor Stinner59e08022014-08-28 11:19:25 +0200311def _release_waiter(waiter, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700312 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200313 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700314
315
316@coroutine
317def wait_for(fut, timeout, *, loop=None):
318 """Wait for the single Future or coroutine to complete, with timeout.
319
320 Coroutine will be wrapped in Task.
321
Victor Stinner421e49b2014-01-23 17:40:59 +0100322 Returns result of the Future or coroutine. When a timeout occurs,
323 it cancels the task and raises TimeoutError. To avoid the task
324 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700325
Victor Stinner922bc2c2015-01-15 16:29:10 +0100326 If the wait is cancelled, the task is also cancelled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700327
Victor Stinner922bc2c2015-01-15 16:29:10 +0100328 This function is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700329 """
330 if loop is None:
331 loop = events.get_event_loop()
332
Guido van Rossum48c66c32014-01-29 14:30:38 -0800333 if timeout is None:
334 return (yield from fut)
335
Victor K4d071892017-10-05 19:04:39 +0300336 if timeout <= 0:
337 fut = ensure_future(fut, loop=loop)
338
339 if fut.done():
340 return fut.result()
341
342 fut.cancel()
343 raise futures.TimeoutError()
344
Yury Selivanov7661db62016-05-16 15:38:39 -0400345 waiter = loop.create_future()
Victor Stinner59e08022014-08-28 11:19:25 +0200346 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
347 cb = functools.partial(_release_waiter, waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700348
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400349 fut = ensure_future(fut, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700350 fut.add_done_callback(cb)
351
352 try:
Victor Stinner59e08022014-08-28 11:19:25 +0200353 # wait until the future completes or the timeout
Victor Stinner922bc2c2015-01-15 16:29:10 +0100354 try:
355 yield from waiter
356 except futures.CancelledError:
357 fut.remove_done_callback(cb)
358 fut.cancel()
359 raise
Victor Stinner59e08022014-08-28 11:19:25 +0200360
361 if fut.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700362 return fut.result()
363 else:
364 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100365 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700366 raise futures.TimeoutError()
367 finally:
368 timeout_handle.cancel()
369
370
371@coroutine
372def _wait(fs, timeout, return_when, loop):
Victor Stinner2ba8ece2016-04-01 21:39:09 +0200373 """Internal helper for wait() and wait_for().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700374
375 The fs argument must be a collection of Futures.
376 """
377 assert fs, 'Set of Futures is empty.'
Yury Selivanov7661db62016-05-16 15:38:39 -0400378 waiter = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700379 timeout_handle = None
380 if timeout is not None:
381 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
382 counter = len(fs)
383
384 def _on_completion(f):
385 nonlocal counter
386 counter -= 1
387 if (counter <= 0 or
388 return_when == FIRST_COMPLETED or
389 return_when == FIRST_EXCEPTION and (not f.cancelled() and
390 f.exception() is not None)):
391 if timeout_handle is not None:
392 timeout_handle.cancel()
393 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200394 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700395
396 for f in fs:
397 f.add_done_callback(_on_completion)
398
399 try:
400 yield from waiter
401 finally:
402 if timeout_handle is not None:
403 timeout_handle.cancel()
404
405 done, pending = set(), set()
406 for f in fs:
407 f.remove_done_callback(_on_completion)
408 if f.done():
409 done.add(f)
410 else:
411 pending.add(f)
412 return done, pending
413
414
415# This is *not* a @coroutine! It is just an iterator (yielding Futures).
416def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800417 """Return an iterator whose values are coroutines.
418
419 When waiting for the yielded coroutines you'll get the results (or
420 exceptions!) of the original Futures (or coroutines), in the order
421 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700422
423 This differs from PEP 3148; the proper way to use this is:
424
425 for f in as_completed(fs):
426 result = yield from f # The 'yield from' may raise.
427 # Use result.
428
Guido van Rossumb58f0532014-02-12 17:58:19 -0800429 If a timeout is specified, the 'yield from' will raise
430 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700431
432 Note: The futures 'f' are not necessarily members of fs.
433 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700434 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100435 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700436 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400437 todo = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800438 from .queues import Queue # Import here to avoid circular import problem.
439 done = Queue(loop=loop)
440 timeout_handle = None
441
442 def _on_timeout():
443 for f in todo:
444 f.remove_done_callback(_on_completion)
445 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
446 todo.clear() # Can't do todo.remove(f) in the loop.
447
448 def _on_completion(f):
449 if not todo:
450 return # _on_timeout() was here first.
451 todo.remove(f)
452 done.put_nowait(f)
453 if not todo and timeout_handle is not None:
454 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700455
456 @coroutine
457 def _wait_for_one():
Guido van Rossumb58f0532014-02-12 17:58:19 -0800458 f = yield from done.get()
459 if f is None:
460 # Dummy value from _on_timeout().
461 raise futures.TimeoutError
462 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700463
Guido van Rossumb58f0532014-02-12 17:58:19 -0800464 for f in todo:
465 f.add_done_callback(_on_completion)
466 if todo and timeout is not None:
467 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700468 for _ in range(len(todo)):
469 yield _wait_for_one()
470
471
472@coroutine
473def sleep(delay, result=None, *, loop=None):
474 """Coroutine that completes after a given time (in seconds)."""
Yury Selivanovade04122015-11-05 14:29:04 -0500475 if delay == 0:
476 yield
477 return result
478
Yury Selivanov7661db62016-05-16 15:38:39 -0400479 if loop is None:
480 loop = events.get_event_loop()
481 future = loop.create_future()
Victor Stinnera9acbe82014-07-05 15:29:41 +0200482 h = future._loop.call_later(delay,
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500483 futures._set_result_unless_cancelled,
484 future, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700485 try:
486 return (yield from future)
487 finally:
488 h.cancel()
489
490
Yury Selivanov4357cf62016-09-15 13:49:08 -0400491def async_(coro_or_future, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700492 """Wrap a coroutine in a future.
493
494 If the argument is a Future, it is returned directly.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400495
496 This function is deprecated in 3.5. Use asyncio.ensure_future() instead.
497 """
498
499 warnings.warn("asyncio.async() function is deprecated, use ensure_future()",
Mariatta Wijaya4e7ff8b2017-02-06 22:03:00 -0800500 DeprecationWarning,
501 stacklevel=2)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400502
503 return ensure_future(coro_or_future, loop=loop)
504
Yury Selivanov4357cf62016-09-15 13:49:08 -0400505# Silence DeprecationWarning:
506globals()['async'] = async_
507async_.__name__ = 'async'
508del async_
509
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400510
511def ensure_future(coro_or_future, *, loop=None):
Yury Selivanov620279b2015-10-02 15:00:19 -0400512 """Wrap a coroutine or an awaitable in a future.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400513
514 If the argument is a Future, it is returned directly.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700515 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700516 if futures.isfuture(coro_or_future):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700517 if loop is not None and loop is not coro_or_future._loop:
518 raise ValueError('loop argument must agree with Future')
519 return coro_or_future
Victor Stinnerf951d282014-06-29 00:46:45 +0200520 elif coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200521 if loop is None:
522 loop = events.get_event_loop()
523 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200524 if task._source_traceback:
525 del task._source_traceback[-1]
526 return task
Victor Stinner3f438a92017-11-28 14:43:52 +0100527 elif inspect.isawaitable(coro_or_future):
Yury Selivanov620279b2015-10-02 15:00:19 -0400528 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700529 else:
Charles Renwickae5b3262017-04-21 16:49:48 -0400530 raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
531 'required')
Yury Selivanov620279b2015-10-02 15:00:19 -0400532
533
534@coroutine
535def _wrap_awaitable(awaitable):
536 """Helper for asyncio.ensure_future().
537
538 Wraps awaitable (an object with __await__) into a coroutine
539 that will later be wrapped in a Task by ensure_future().
540 """
541 return (yield from awaitable.__await__())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700542
543
544class _GatheringFuture(futures.Future):
545 """Helper for gather().
546
547 This overrides cancel() to cancel all the children and act more
548 like Task.cancel(), which doesn't immediately mark itself as
549 cancelled.
550 """
551
552 def __init__(self, children, *, loop=None):
553 super().__init__(loop=loop)
554 self._children = children
555
556 def cancel(self):
557 if self.done():
558 return False
Yury Selivanov3d676152016-10-21 17:22:17 -0400559 ret = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700560 for child in self._children:
Yury Selivanov3d676152016-10-21 17:22:17 -0400561 if child.cancel():
562 ret = True
563 return ret
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700564
565
566def gather(*coros_or_futures, loop=None, return_exceptions=False):
567 """Return a future aggregating results from the given coroutines
568 or futures.
569
Guido van Rossume3c65a72016-09-30 08:17:15 -0700570 Coroutines will be wrapped in a future and scheduled in the event
571 loop. They will not necessarily be scheduled in the same order as
572 passed in.
573
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700574 All futures must share the same event loop. If all the tasks are
575 done successfully, the returned future's result is the list of
576 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500577 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700578 exceptions in the tasks are treated the same as successful
579 results, and gathered in the result list; otherwise, the first
580 raised exception will be immediately propagated to the returned
581 future.
582
583 Cancellation: if the outer Future is cancelled, all children (that
584 have not completed yet) are also cancelled. If any child is
585 cancelled, this is treated as if it raised CancelledError --
586 the outer Future is *not* cancelled in this case. (This is to
587 prevent the cancellation of one child to cause other children to
588 be cancelled.)
589 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200590 if not coros_or_futures:
Yury Selivanov7661db62016-05-16 15:38:39 -0400591 if loop is None:
592 loop = events.get_event_loop()
593 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700594 outer.set_result([])
595 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200596
597 arg_to_fut = {}
598 for arg in set(coros_or_futures):
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700599 if not futures.isfuture(arg):
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400600 fut = ensure_future(arg, loop=loop)
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200601 if loop is None:
602 loop = fut._loop
603 # The caller cannot control this future, the "destroy pending task"
604 # warning should not be emitted.
605 fut._log_destroy_pending = False
606 else:
607 fut = arg
608 if loop is None:
609 loop = fut._loop
610 elif fut._loop is not loop:
611 raise ValueError("futures are tied to different event loops")
612 arg_to_fut[arg] = fut
613
614 children = [arg_to_fut[arg] for arg in coros_or_futures]
615 nchildren = len(children)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700616 outer = _GatheringFuture(children, loop=loop)
617 nfinished = 0
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200618 results = [None] * nchildren
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700619
620 def _done_callback(i, fut):
621 nonlocal nfinished
Victor Stinner3531d902015-01-09 01:42:52 +0100622 if outer.done():
623 if not fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700624 # Mark exception retrieved.
625 fut.exception()
626 return
Victor Stinner3531d902015-01-09 01:42:52 +0100627
Victor Stinner29342622015-01-29 14:15:19 +0100628 if fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700629 res = futures.CancelledError()
630 if not return_exceptions:
631 outer.set_exception(res)
632 return
633 elif fut._exception is not None:
634 res = fut.exception() # Mark exception retrieved.
635 if not return_exceptions:
636 outer.set_exception(res)
637 return
638 else:
639 res = fut._result
640 results[i] = res
641 nfinished += 1
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200642 if nfinished == nchildren:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700643 outer.set_result(results)
644
645 for i, fut in enumerate(children):
646 fut.add_done_callback(functools.partial(_done_callback, i))
647 return outer
648
649
650def shield(arg, *, loop=None):
651 """Wait for a future, shielding it from cancellation.
652
653 The statement
654
655 res = yield from shield(something())
656
657 is exactly equivalent to the statement
658
659 res = yield from something()
660
661 *except* that if the coroutine containing it is cancelled, the
662 task running in something() is not cancelled. From the POV of
663 something(), the cancellation did not happen. But its caller is
664 still cancelled, so the yield-from expression still raises
665 CancelledError. Note: If something() is cancelled by other means
666 this will still cancel shield().
667
668 If you want to completely ignore cancellation (not recommended)
669 you can combine shield() with a try/except clause, as follows:
670
671 try:
672 res = yield from shield(something())
673 except CancelledError:
674 res = None
675 """
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400676 inner = ensure_future(arg, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700677 if inner.done():
678 # Shortcut.
679 return inner
680 loop = inner._loop
Yury Selivanov7661db62016-05-16 15:38:39 -0400681 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700682
683 def _done_callback(inner):
684 if outer.cancelled():
Victor Stinner3531d902015-01-09 01:42:52 +0100685 if not inner.cancelled():
686 # Mark inner's result as retrieved.
687 inner.exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700688 return
Victor Stinner3531d902015-01-09 01:42:52 +0100689
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700690 if inner.cancelled():
691 outer.cancel()
692 else:
693 exc = inner.exception()
694 if exc is not None:
695 outer.set_exception(exc)
696 else:
697 outer.set_result(inner.result())
698
699 inner.add_done_callback(_done_callback)
700 return outer
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700701
702
703def run_coroutine_threadsafe(coro, loop):
704 """Submit a coroutine object to a given event loop.
705
706 Return a concurrent.futures.Future to access the result.
707 """
708 if not coroutines.iscoroutine(coro):
709 raise TypeError('A coroutine object is required')
710 future = concurrent.futures.Future()
711
712 def callback():
Guido van Rossum601953b2015-10-05 16:20:00 -0700713 try:
714 futures._chain_future(ensure_future(coro, loop=loop), future)
715 except Exception as exc:
716 if future.set_running_or_notify_cancel():
717 future.set_exception(exc)
718 raise
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700719
720 loop.call_soon_threadsafe(callback)
721 return future