blob: e4533000e7985f3a429e4a699ebacc79105e944a [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Victor Stinnerf951d282014-06-29 00:46:45 +02003__all__ = ['Task',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
5 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
Guido van Rossum841d9ee2015-10-03 08:31:42 -07006 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007 ]
8
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009import concurrent.futures
10import functools
11import inspect
Yury Selivanov59eb9a42015-05-11 14:48:38 -040012import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070013import weakref
14
Yury Selivanova0c1ba62016-10-28 12:52:37 -040015from . import base_tasks
Victor Stinner71080fc2015-07-25 02:23:21 +020016from . import compat
Victor Stinnerf951d282014-06-29 00:46:45 +020017from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import events
19from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020020from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023class Task(futures.Future):
24 """A coroutine wrapped in a Future."""
25
26 # An important invariant maintained while a Task not done:
27 #
28 # - Either _fut_waiter is None, and _step() is scheduled;
29 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
30 #
31 # The only transition from the latter to the former is through
32 # _wakeup(). When _fut_waiter is not None, one of its callbacks
33 # must be _wakeup().
34
35 # Weak set containing all tasks alive.
36 _all_tasks = weakref.WeakSet()
37
Guido van Rossum1a605ed2013-12-06 12:57:40 -080038 # Dictionary containing tasks that are currently active in
39 # all running event loops. {EventLoop: Task}
40 _current_tasks = {}
41
Victor Stinnerfe22e092014-12-04 23:00:13 +010042 # If False, don't log a message if the task is destroyed whereas its
43 # status is still pending
44 _log_destroy_pending = True
45
Guido van Rossum1a605ed2013-12-06 12:57:40 -080046 @classmethod
47 def current_task(cls, loop=None):
48 """Return the currently running task in an event loop or None.
49
50 By default the current task for the current event loop is returned.
51
52 None is returned when called not in the context of a Task.
53 """
54 if loop is None:
55 loop = events.get_event_loop()
56 return cls._current_tasks.get(loop)
57
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070058 @classmethod
59 def all_tasks(cls, loop=None):
60 """Return a set of all tasks for an event loop.
61
62 By default all tasks for the current event loop are returned.
63 """
64 if loop is None:
65 loop = events.get_event_loop()
66 return {t for t in cls._all_tasks if t._loop is loop}
67
68 def __init__(self, coro, *, loop=None):
Victor Stinner15cc6782015-01-09 00:09:10 +010069 assert coroutines.iscoroutine(coro), repr(coro)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070070 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +020071 if self._source_traceback:
72 del self._source_traceback[-1]
Yury Selivanov1ad08a52015-05-28 10:52:19 -040073 self._coro = coro
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074 self._fut_waiter = None
75 self._must_cancel = False
76 self._loop.call_soon(self._step)
77 self.__class__._all_tasks.add(self)
78
INADA Naoki3e2ad8e2017-04-25 10:57:18 +090079 def __del__(self):
80 if self._state == futures._PENDING and self._log_destroy_pending:
81 context = {
82 'task': self,
83 'message': 'Task was destroyed but it is pending!',
84 }
85 if self._source_traceback:
86 context['source_traceback'] = self._source_traceback
87 self._loop.call_exception_handler(context)
88 futures.Future.__del__(self)
Victor Stinnera02f81f2014-06-24 22:37:53 +020089
Victor Stinner313a9802014-07-29 12:58:23 +020090 def _repr_info(self):
Yury Selivanova0c1ba62016-10-28 12:52:37 -040091 return base_tasks._task_repr_info(self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070092
93 def get_stack(self, *, limit=None):
94 """Return the list of stack frames for this task's coroutine.
95
Victor Stinnerd87de832014-12-02 17:57:04 +010096 If the coroutine is not done, this returns the stack where it is
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070097 suspended. If the coroutine has completed successfully or was
98 cancelled, this returns an empty list. If the coroutine was
99 terminated by an exception, this returns the list of traceback
100 frames.
101
102 The frames are always ordered from oldest to newest.
103
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500104 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700105 return; by default all available frames are returned. Its
106 meaning differs depending on whether a stack or a traceback is
107 returned: the newest frames of a stack are returned, but the
108 oldest frames of a traceback are returned. (This matches the
109 behavior of the traceback module.)
110
111 For reasons beyond our control, only one stack frame is
112 returned for a suspended coroutine.
113 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400114 return base_tasks._task_get_stack(self, limit)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700115
116 def print_stack(self, *, limit=None, file=None):
117 """Print the stack or traceback for this task's coroutine.
118
119 This produces output similar to that of the traceback module,
120 for the frames retrieved by get_stack(). The limit argument
121 is passed to get_stack(). The file argument is an I/O stream
R David Murray8e069d52014-09-24 13:13:45 -0400122 to which the output is written; by default output is written
123 to sys.stderr.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700124 """
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400125 return base_tasks._task_print_stack(self, limit, file)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700126
127 def cancel(self):
R David Murray8e069d52014-09-24 13:13:45 -0400128 """Request that this task cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200129
Victor Stinner8d213572014-06-02 23:06:46 +0200130 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200131 wrapped coroutine on the next cycle through the event loop.
132 The coroutine then has a chance to clean up or even deny
133 the request using try/except/finally.
134
R David Murray8e069d52014-09-24 13:13:45 -0400135 Unlike Future.cancel, this does not guarantee that the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200136 task will be cancelled: the exception might be caught and
R David Murray8e069d52014-09-24 13:13:45 -0400137 acted upon, delaying cancellation of the task or preventing
138 cancellation completely. The task may also return a value or
139 raise a different exception.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200140
141 Immediately after this method is called, Task.cancelled() will
142 not return True (unless the task was already cancelled). A
143 task will be marked as cancelled when the wrapped coroutine
144 terminates with a CancelledError exception (even if cancel()
145 was not called).
146 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700147 if self.done():
148 return False
149 if self._fut_waiter is not None:
150 if self._fut_waiter.cancel():
151 # Leave self._fut_waiter; it may be a Task that
152 # catches and ignores the cancellation so we may have
153 # to cancel it again later.
154 return True
155 # It must be the case that self._step is already scheduled.
156 self._must_cancel = True
157 return True
158
Yury Selivanovd59bba82015-11-20 12:41:03 -0500159 def _step(self, exc=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700160 assert not self.done(), \
Yury Selivanovd59bba82015-11-20 12:41:03 -0500161 '_step(): already done: {!r}, {!r}'.format(self, exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700162 if self._must_cancel:
163 if not isinstance(exc, futures.CancelledError):
164 exc = futures.CancelledError()
165 self._must_cancel = False
166 coro = self._coro
167 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800168
169 self.__class__._current_tasks[self._loop] = self
Yury Selivanovd59bba82015-11-20 12:41:03 -0500170 # Call either coro.throw(exc) or coro.send(None).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700171 try:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500172 if exc is None:
173 # We use the `send` method directly, because coroutines
174 # don't have `__iter__` and `__next__` methods.
175 result = coro.send(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700176 else:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500177 result = coro.throw(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700178 except StopIteration as exc:
INADA Naoki991adca2017-05-11 21:18:38 +0900179 if self._must_cancel:
180 # Task is cancelled right before coro stops.
181 self._must_cancel = False
182 self.set_exception(futures.CancelledError())
183 else:
184 self.set_result(exc.value)
Yury Selivanov4145c832016-10-09 12:19:12 -0400185 except futures.CancelledError:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700186 super().cancel() # I.e., Future.cancel(self).
187 except Exception as exc:
188 self.set_exception(exc)
189 except BaseException as exc:
190 self.set_exception(exc)
191 raise
192 else:
Guido van Rossum1140a032016-09-09 12:54:54 -0700193 blocking = getattr(result, '_asyncio_future_blocking', None)
194 if blocking is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700195 # Yielded Future must come from Future.__iter__().
Yury Selivanov0ac3a0c2015-12-11 11:33:59 -0500196 if result._loop is not self._loop:
197 self._loop.call_soon(
198 self._step,
199 RuntimeError(
200 'Task {!r} got Future {!r} attached to a '
201 'different loop'.format(self, result)))
Guido van Rossum1140a032016-09-09 12:54:54 -0700202 elif blocking:
Yury Selivanov4145c832016-10-09 12:19:12 -0400203 if result is self:
204 self._loop.call_soon(
205 self._step,
206 RuntimeError(
207 'Task cannot await on itself: {!r}'.format(
208 self)))
209 else:
210 result._asyncio_future_blocking = False
211 result.add_done_callback(self._wakeup)
212 self._fut_waiter = result
213 if self._must_cancel:
214 if self._fut_waiter.cancel():
215 self._must_cancel = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700216 else:
217 self._loop.call_soon(
Yury Selivanovd59bba82015-11-20 12:41:03 -0500218 self._step,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700219 RuntimeError(
220 'yield was used instead of yield from '
221 'in task {!r} with {!r}'.format(self, result)))
222 elif result is None:
223 # Bare yield relinquishes control for one event loop iteration.
224 self._loop.call_soon(self._step)
225 elif inspect.isgenerator(result):
226 # Yielding a generator is just wrong.
227 self._loop.call_soon(
Yury Selivanovd59bba82015-11-20 12:41:03 -0500228 self._step,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700229 RuntimeError(
230 'yield was used instead of yield from for '
231 'generator in task {!r} with {}'.format(
232 self, result)))
233 else:
234 # Yielding something else is an error.
235 self._loop.call_soon(
Yury Selivanovd59bba82015-11-20 12:41:03 -0500236 self._step,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700237 RuntimeError(
238 'Task got bad yield: {!r}'.format(result)))
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800239 finally:
240 self.__class__._current_tasks.pop(self._loop)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100241 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700242
243 def _wakeup(self, future):
244 try:
Yury Selivanova4afc482015-11-16 15:12:10 -0500245 future.result()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700246 except Exception as exc:
247 # This may also be a cancellation.
Yury Selivanovd59bba82015-11-20 12:41:03 -0500248 self._step(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700249 else:
Yury Selivanova4afc482015-11-16 15:12:10 -0500250 # Don't pass the value of `future.result()` explicitly,
251 # as `Future.__iter__` and `Future.__await__` don't need it.
252 # If we call `_step(value, None)` instead of `_step()`,
253 # Python eval loop would use `.send(value)` method call,
254 # instead of `__next__()`, which is slower for futures
255 # that return non-generator iterators from their `__iter__`.
256 self._step()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700257 self = None # Needed to break cycles when an exception occurs.
258
259
Yury Selivanova0c1ba62016-10-28 12:52:37 -0400260_PyTask = Task
261
262
263try:
264 import _asyncio
265except ImportError:
266 pass
267else:
268 # _CTask is needed for tests.
269 Task = _CTask = _asyncio.Task
270
271
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700272# wait() and as_completed() similar to those in PEP 3148.
273
274FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
275FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
276ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
277
278
279@coroutine
280def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
281 """Wait for the Futures and coroutines given by fs to complete.
282
Victor Stinnerdb74d982014-06-10 11:16:05 +0200283 The sequence futures must not be empty.
284
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700285 Coroutines will be wrapped in Tasks.
286
287 Returns two sets of Future: (done, pending).
288
289 Usage:
290
291 done, pending = yield from asyncio.wait(fs)
292
293 Note: This does not raise TimeoutError! Futures that aren't done
294 when the timeout occurs are returned in the second set.
295 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700296 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100297 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700298 if not fs:
299 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200300 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
301 raise ValueError('Invalid return_when value: {}'.format(return_when))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700302
303 if loop is None:
304 loop = events.get_event_loop()
305
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400306 fs = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700307
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700308 return (yield from _wait(fs, timeout, return_when, loop))
309
310
Victor Stinner59e08022014-08-28 11:19:25 +0200311def _release_waiter(waiter, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700312 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200313 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700314
315
316@coroutine
317def wait_for(fut, timeout, *, loop=None):
318 """Wait for the single Future or coroutine to complete, with timeout.
319
320 Coroutine will be wrapped in Task.
321
Victor Stinner421e49b2014-01-23 17:40:59 +0100322 Returns result of the Future or coroutine. When a timeout occurs,
323 it cancels the task and raises TimeoutError. To avoid the task
324 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700325
Victor Stinner922bc2c2015-01-15 16:29:10 +0100326 If the wait is cancelled, the task is also cancelled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700327
Victor Stinner922bc2c2015-01-15 16:29:10 +0100328 This function is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700329 """
330 if loop is None:
331 loop = events.get_event_loop()
332
Guido van Rossum48c66c32014-01-29 14:30:38 -0800333 if timeout is None:
334 return (yield from fut)
335
Yury Selivanov7661db62016-05-16 15:38:39 -0400336 waiter = loop.create_future()
Victor Stinner59e08022014-08-28 11:19:25 +0200337 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
338 cb = functools.partial(_release_waiter, waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700339
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400340 fut = ensure_future(fut, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700341 fut.add_done_callback(cb)
342
343 try:
Victor Stinner59e08022014-08-28 11:19:25 +0200344 # wait until the future completes or the timeout
Victor Stinner922bc2c2015-01-15 16:29:10 +0100345 try:
346 yield from waiter
347 except futures.CancelledError:
348 fut.remove_done_callback(cb)
349 fut.cancel()
350 raise
Victor Stinner59e08022014-08-28 11:19:25 +0200351
352 if fut.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700353 return fut.result()
354 else:
355 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100356 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700357 raise futures.TimeoutError()
358 finally:
359 timeout_handle.cancel()
360
361
362@coroutine
363def _wait(fs, timeout, return_when, loop):
Victor Stinner2ba8ece2016-04-01 21:39:09 +0200364 """Internal helper for wait() and wait_for().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700365
366 The fs argument must be a collection of Futures.
367 """
368 assert fs, 'Set of Futures is empty.'
Yury Selivanov7661db62016-05-16 15:38:39 -0400369 waiter = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700370 timeout_handle = None
371 if timeout is not None:
372 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
373 counter = len(fs)
374
375 def _on_completion(f):
376 nonlocal counter
377 counter -= 1
378 if (counter <= 0 or
379 return_when == FIRST_COMPLETED or
380 return_when == FIRST_EXCEPTION and (not f.cancelled() and
381 f.exception() is not None)):
382 if timeout_handle is not None:
383 timeout_handle.cancel()
384 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200385 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700386
387 for f in fs:
388 f.add_done_callback(_on_completion)
389
390 try:
391 yield from waiter
392 finally:
393 if timeout_handle is not None:
394 timeout_handle.cancel()
395
396 done, pending = set(), set()
397 for f in fs:
398 f.remove_done_callback(_on_completion)
399 if f.done():
400 done.add(f)
401 else:
402 pending.add(f)
403 return done, pending
404
405
406# This is *not* a @coroutine! It is just an iterator (yielding Futures).
407def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800408 """Return an iterator whose values are coroutines.
409
410 When waiting for the yielded coroutines you'll get the results (or
411 exceptions!) of the original Futures (or coroutines), in the order
412 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700413
414 This differs from PEP 3148; the proper way to use this is:
415
416 for f in as_completed(fs):
417 result = yield from f # The 'yield from' may raise.
418 # Use result.
419
Guido van Rossumb58f0532014-02-12 17:58:19 -0800420 If a timeout is specified, the 'yield from' will raise
421 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700422
423 Note: The futures 'f' are not necessarily members of fs.
424 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700425 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100426 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700427 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400428 todo = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800429 from .queues import Queue # Import here to avoid circular import problem.
430 done = Queue(loop=loop)
431 timeout_handle = None
432
433 def _on_timeout():
434 for f in todo:
435 f.remove_done_callback(_on_completion)
436 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
437 todo.clear() # Can't do todo.remove(f) in the loop.
438
439 def _on_completion(f):
440 if not todo:
441 return # _on_timeout() was here first.
442 todo.remove(f)
443 done.put_nowait(f)
444 if not todo and timeout_handle is not None:
445 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700446
447 @coroutine
448 def _wait_for_one():
Guido van Rossumb58f0532014-02-12 17:58:19 -0800449 f = yield from done.get()
450 if f is None:
451 # Dummy value from _on_timeout().
452 raise futures.TimeoutError
453 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700454
Guido van Rossumb58f0532014-02-12 17:58:19 -0800455 for f in todo:
456 f.add_done_callback(_on_completion)
457 if todo and timeout is not None:
458 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700459 for _ in range(len(todo)):
460 yield _wait_for_one()
461
462
463@coroutine
464def sleep(delay, result=None, *, loop=None):
465 """Coroutine that completes after a given time (in seconds)."""
Yury Selivanovade04122015-11-05 14:29:04 -0500466 if delay == 0:
467 yield
468 return result
469
Yury Selivanov7661db62016-05-16 15:38:39 -0400470 if loop is None:
471 loop = events.get_event_loop()
472 future = loop.create_future()
Victor Stinnera9acbe82014-07-05 15:29:41 +0200473 h = future._loop.call_later(delay,
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500474 futures._set_result_unless_cancelled,
475 future, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700476 try:
477 return (yield from future)
478 finally:
479 h.cancel()
480
481
Yury Selivanov4357cf62016-09-15 13:49:08 -0400482def async_(coro_or_future, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700483 """Wrap a coroutine in a future.
484
485 If the argument is a Future, it is returned directly.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400486
487 This function is deprecated in 3.5. Use asyncio.ensure_future() instead.
488 """
489
490 warnings.warn("asyncio.async() function is deprecated, use ensure_future()",
Mariatta Wijaya4e7ff8b2017-02-06 22:03:00 -0800491 DeprecationWarning,
492 stacklevel=2)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400493
494 return ensure_future(coro_or_future, loop=loop)
495
Yury Selivanov4357cf62016-09-15 13:49:08 -0400496# Silence DeprecationWarning:
497globals()['async'] = async_
498async_.__name__ = 'async'
499del async_
500
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400501
502def ensure_future(coro_or_future, *, loop=None):
Yury Selivanov620279b2015-10-02 15:00:19 -0400503 """Wrap a coroutine or an awaitable in a future.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400504
505 If the argument is a Future, it is returned directly.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700506 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700507 if futures.isfuture(coro_or_future):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700508 if loop is not None and loop is not coro_or_future._loop:
509 raise ValueError('loop argument must agree with Future')
510 return coro_or_future
Victor Stinnerf951d282014-06-29 00:46:45 +0200511 elif coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200512 if loop is None:
513 loop = events.get_event_loop()
514 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200515 if task._source_traceback:
516 del task._source_traceback[-1]
517 return task
Yury Selivanov620279b2015-10-02 15:00:19 -0400518 elif compat.PY35 and inspect.isawaitable(coro_or_future):
519 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700520 else:
Charles Renwickae5b3262017-04-21 16:49:48 -0400521 raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
522 'required')
Yury Selivanov620279b2015-10-02 15:00:19 -0400523
524
525@coroutine
526def _wrap_awaitable(awaitable):
527 """Helper for asyncio.ensure_future().
528
529 Wraps awaitable (an object with __await__) into a coroutine
530 that will later be wrapped in a Task by ensure_future().
531 """
532 return (yield from awaitable.__await__())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700533
534
535class _GatheringFuture(futures.Future):
536 """Helper for gather().
537
538 This overrides cancel() to cancel all the children and act more
539 like Task.cancel(), which doesn't immediately mark itself as
540 cancelled.
541 """
542
543 def __init__(self, children, *, loop=None):
544 super().__init__(loop=loop)
545 self._children = children
546
547 def cancel(self):
548 if self.done():
549 return False
Yury Selivanov3d676152016-10-21 17:22:17 -0400550 ret = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700551 for child in self._children:
Yury Selivanov3d676152016-10-21 17:22:17 -0400552 if child.cancel():
553 ret = True
554 return ret
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700555
556
557def gather(*coros_or_futures, loop=None, return_exceptions=False):
558 """Return a future aggregating results from the given coroutines
559 or futures.
560
Guido van Rossume3c65a72016-09-30 08:17:15 -0700561 Coroutines will be wrapped in a future and scheduled in the event
562 loop. They will not necessarily be scheduled in the same order as
563 passed in.
564
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700565 All futures must share the same event loop. If all the tasks are
566 done successfully, the returned future's result is the list of
567 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500568 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700569 exceptions in the tasks are treated the same as successful
570 results, and gathered in the result list; otherwise, the first
571 raised exception will be immediately propagated to the returned
572 future.
573
574 Cancellation: if the outer Future is cancelled, all children (that
575 have not completed yet) are also cancelled. If any child is
576 cancelled, this is treated as if it raised CancelledError --
577 the outer Future is *not* cancelled in this case. (This is to
578 prevent the cancellation of one child to cause other children to
579 be cancelled.)
580 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200581 if not coros_or_futures:
Yury Selivanov7661db62016-05-16 15:38:39 -0400582 if loop is None:
583 loop = events.get_event_loop()
584 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700585 outer.set_result([])
586 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200587
588 arg_to_fut = {}
589 for arg in set(coros_or_futures):
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700590 if not futures.isfuture(arg):
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400591 fut = ensure_future(arg, loop=loop)
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200592 if loop is None:
593 loop = fut._loop
594 # The caller cannot control this future, the "destroy pending task"
595 # warning should not be emitted.
596 fut._log_destroy_pending = False
597 else:
598 fut = arg
599 if loop is None:
600 loop = fut._loop
601 elif fut._loop is not loop:
602 raise ValueError("futures are tied to different event loops")
603 arg_to_fut[arg] = fut
604
605 children = [arg_to_fut[arg] for arg in coros_or_futures]
606 nchildren = len(children)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700607 outer = _GatheringFuture(children, loop=loop)
608 nfinished = 0
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200609 results = [None] * nchildren
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700610
611 def _done_callback(i, fut):
612 nonlocal nfinished
Victor Stinner3531d902015-01-09 01:42:52 +0100613 if outer.done():
614 if not fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700615 # Mark exception retrieved.
616 fut.exception()
617 return
Victor Stinner3531d902015-01-09 01:42:52 +0100618
Victor Stinner29342622015-01-29 14:15:19 +0100619 if fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700620 res = futures.CancelledError()
621 if not return_exceptions:
622 outer.set_exception(res)
623 return
624 elif fut._exception is not None:
625 res = fut.exception() # Mark exception retrieved.
626 if not return_exceptions:
627 outer.set_exception(res)
628 return
629 else:
630 res = fut._result
631 results[i] = res
632 nfinished += 1
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200633 if nfinished == nchildren:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700634 outer.set_result(results)
635
636 for i, fut in enumerate(children):
637 fut.add_done_callback(functools.partial(_done_callback, i))
638 return outer
639
640
641def shield(arg, *, loop=None):
642 """Wait for a future, shielding it from cancellation.
643
644 The statement
645
646 res = yield from shield(something())
647
648 is exactly equivalent to the statement
649
650 res = yield from something()
651
652 *except* that if the coroutine containing it is cancelled, the
653 task running in something() is not cancelled. From the POV of
654 something(), the cancellation did not happen. But its caller is
655 still cancelled, so the yield-from expression still raises
656 CancelledError. Note: If something() is cancelled by other means
657 this will still cancel shield().
658
659 If you want to completely ignore cancellation (not recommended)
660 you can combine shield() with a try/except clause, as follows:
661
662 try:
663 res = yield from shield(something())
664 except CancelledError:
665 res = None
666 """
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400667 inner = ensure_future(arg, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700668 if inner.done():
669 # Shortcut.
670 return inner
671 loop = inner._loop
Yury Selivanov7661db62016-05-16 15:38:39 -0400672 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700673
674 def _done_callback(inner):
675 if outer.cancelled():
Victor Stinner3531d902015-01-09 01:42:52 +0100676 if not inner.cancelled():
677 # Mark inner's result as retrieved.
678 inner.exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700679 return
Victor Stinner3531d902015-01-09 01:42:52 +0100680
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700681 if inner.cancelled():
682 outer.cancel()
683 else:
684 exc = inner.exception()
685 if exc is not None:
686 outer.set_exception(exc)
687 else:
688 outer.set_result(inner.result())
689
690 inner.add_done_callback(_done_callback)
691 return outer
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700692
693
694def run_coroutine_threadsafe(coro, loop):
695 """Submit a coroutine object to a given event loop.
696
697 Return a concurrent.futures.Future to access the result.
698 """
699 if not coroutines.iscoroutine(coro):
700 raise TypeError('A coroutine object is required')
701 future = concurrent.futures.Future()
702
703 def callback():
Guido van Rossum601953b2015-10-05 16:20:00 -0700704 try:
705 futures._chain_future(ensure_future(coro, loop=loop), future)
706 except Exception as exc:
707 if future.set_running_or_notify_cancel():
708 future.set_exception(exc)
709 raise
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700710
711 loop.call_soon_threadsafe(callback)
712 return future