blob: cab4998ee44956152d8e5b6d58459f79b0a53d57 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Victor Stinnerf951d282014-06-29 00:46:45 +02003__all__ = ['Task',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
5 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
Guido van Rossum841d9ee2015-10-03 08:31:42 -07006 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
Andrew Svetlovc07b16b2016-01-11 08:42:49 +02007 'timeout',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008 ]
9
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070010import concurrent.futures
11import functools
12import inspect
13import linecache
14import traceback
Yury Selivanov59eb9a42015-05-11 14:48:38 -040015import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016import weakref
17
Victor Stinner71080fc2015-07-25 02:23:21 +020018from . import compat
Victor Stinnerf951d282014-06-29 00:46:45 +020019from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020from . import events
21from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020022from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070024
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025class Task(futures.Future):
26 """A coroutine wrapped in a Future."""
27
28 # An important invariant maintained while a Task not done:
29 #
30 # - Either _fut_waiter is None, and _step() is scheduled;
31 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
32 #
33 # The only transition from the latter to the former is through
34 # _wakeup(). When _fut_waiter is not None, one of its callbacks
35 # must be _wakeup().
36
37 # Weak set containing all tasks alive.
38 _all_tasks = weakref.WeakSet()
39
Guido van Rossum1a605ed2013-12-06 12:57:40 -080040 # Dictionary containing tasks that are currently active in
41 # all running event loops. {EventLoop: Task}
42 _current_tasks = {}
43
Victor Stinnerfe22e092014-12-04 23:00:13 +010044 # If False, don't log a message if the task is destroyed whereas its
45 # status is still pending
46 _log_destroy_pending = True
47
Guido van Rossum1a605ed2013-12-06 12:57:40 -080048 @classmethod
49 def current_task(cls, loop=None):
50 """Return the currently running task in an event loop or None.
51
52 By default the current task for the current event loop is returned.
53
54 None is returned when called not in the context of a Task.
55 """
56 if loop is None:
57 loop = events.get_event_loop()
58 return cls._current_tasks.get(loop)
59
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070060 @classmethod
61 def all_tasks(cls, loop=None):
62 """Return a set of all tasks for an event loop.
63
64 By default all tasks for the current event loop are returned.
65 """
66 if loop is None:
67 loop = events.get_event_loop()
68 return {t for t in cls._all_tasks if t._loop is loop}
69
70 def __init__(self, coro, *, loop=None):
Victor Stinner15cc6782015-01-09 00:09:10 +010071 assert coroutines.iscoroutine(coro), repr(coro)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070072 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +020073 if self._source_traceback:
74 del self._source_traceback[-1]
Yury Selivanov1ad08a52015-05-28 10:52:19 -040075 self._coro = coro
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070076 self._fut_waiter = None
77 self._must_cancel = False
78 self._loop.call_soon(self._step)
79 self.__class__._all_tasks.add(self)
80
R David Murray8e069d52014-09-24 13:13:45 -040081 # On Python 3.3 or older, objects with a destructor that are part of a
82 # reference cycle are never destroyed. That's not the case any more on
83 # Python 3.4 thanks to the PEP 442.
Victor Stinner71080fc2015-07-25 02:23:21 +020084 if compat.PY34:
Victor Stinnera02f81f2014-06-24 22:37:53 +020085 def __del__(self):
Victor Stinner98b63912014-06-30 14:51:04 +020086 if self._state == futures._PENDING and self._log_destroy_pending:
Victor Stinner80f53aa2014-06-27 13:52:20 +020087 context = {
Victor Stinnera02f81f2014-06-24 22:37:53 +020088 'task': self,
89 'message': 'Task was destroyed but it is pending!',
Victor Stinner80f53aa2014-06-27 13:52:20 +020090 }
91 if self._source_traceback:
92 context['source_traceback'] = self._source_traceback
93 self._loop.call_exception_handler(context)
Victor Stinnera02f81f2014-06-24 22:37:53 +020094 futures.Future.__del__(self)
95
Victor Stinner313a9802014-07-29 12:58:23 +020096 def _repr_info(self):
97 info = super()._repr_info()
98
Victor Stinner975735f2014-06-25 21:41:58 +020099 if self._must_cancel:
Victor Stinner313a9802014-07-29 12:58:23 +0200100 # replace status
101 info[0] = 'cancelling'
Victor Stinner975735f2014-06-25 21:41:58 +0200102
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200103 coro = coroutines._format_coroutine(self._coro)
Victor Stinner313a9802014-07-29 12:58:23 +0200104 info.insert(1, 'coro=<%s>' % coro)
Victor Stinner975735f2014-06-25 21:41:58 +0200105
Victor Stinner2dba23a2014-07-03 00:59:00 +0200106 if self._fut_waiter is not None:
Victor Stinner313a9802014-07-29 12:58:23 +0200107 info.insert(2, 'wait_for=%r' % self._fut_waiter)
108 return info
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700109
110 def get_stack(self, *, limit=None):
111 """Return the list of stack frames for this task's coroutine.
112
Victor Stinnerd87de832014-12-02 17:57:04 +0100113 If the coroutine is not done, this returns the stack where it is
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700114 suspended. If the coroutine has completed successfully or was
115 cancelled, this returns an empty list. If the coroutine was
116 terminated by an exception, this returns the list of traceback
117 frames.
118
119 The frames are always ordered from oldest to newest.
120
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500121 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700122 return; by default all available frames are returned. Its
123 meaning differs depending on whether a stack or a traceback is
124 returned: the newest frames of a stack are returned, but the
125 oldest frames of a traceback are returned. (This matches the
126 behavior of the traceback module.)
127
128 For reasons beyond our control, only one stack frame is
129 returned for a suspended coroutine.
130 """
131 frames = []
Yury Selivanov23398332015-08-14 15:30:59 -0400132 try:
133 # 'async def' coroutines
134 f = self._coro.cr_frame
135 except AttributeError:
136 f = self._coro.gi_frame
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700137 if f is not None:
138 while f is not None:
139 if limit is not None:
140 if limit <= 0:
141 break
142 limit -= 1
143 frames.append(f)
144 f = f.f_back
145 frames.reverse()
146 elif self._exception is not None:
147 tb = self._exception.__traceback__
148 while tb is not None:
149 if limit is not None:
150 if limit <= 0:
151 break
152 limit -= 1
153 frames.append(tb.tb_frame)
154 tb = tb.tb_next
155 return frames
156
157 def print_stack(self, *, limit=None, file=None):
158 """Print the stack or traceback for this task's coroutine.
159
160 This produces output similar to that of the traceback module,
161 for the frames retrieved by get_stack(). The limit argument
162 is passed to get_stack(). The file argument is an I/O stream
R David Murray8e069d52014-09-24 13:13:45 -0400163 to which the output is written; by default output is written
164 to sys.stderr.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700165 """
166 extracted_list = []
167 checked = set()
168 for f in self.get_stack(limit=limit):
169 lineno = f.f_lineno
170 co = f.f_code
171 filename = co.co_filename
172 name = co.co_name
173 if filename not in checked:
174 checked.add(filename)
175 linecache.checkcache(filename)
176 line = linecache.getline(filename, lineno, f.f_globals)
177 extracted_list.append((filename, lineno, name, line))
178 exc = self._exception
179 if not extracted_list:
180 print('No stack for %r' % self, file=file)
181 elif exc is not None:
182 print('Traceback for %r (most recent call last):' % self,
183 file=file)
184 else:
185 print('Stack for %r (most recent call last):' % self,
186 file=file)
187 traceback.print_list(extracted_list, file=file)
188 if exc is not None:
189 for line in traceback.format_exception_only(exc.__class__, exc):
190 print(line, file=file, end='')
191
192 def cancel(self):
R David Murray8e069d52014-09-24 13:13:45 -0400193 """Request that this task cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200194
Victor Stinner8d213572014-06-02 23:06:46 +0200195 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200196 wrapped coroutine on the next cycle through the event loop.
197 The coroutine then has a chance to clean up or even deny
198 the request using try/except/finally.
199
R David Murray8e069d52014-09-24 13:13:45 -0400200 Unlike Future.cancel, this does not guarantee that the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200201 task will be cancelled: the exception might be caught and
R David Murray8e069d52014-09-24 13:13:45 -0400202 acted upon, delaying cancellation of the task or preventing
203 cancellation completely. The task may also return a value or
204 raise a different exception.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200205
206 Immediately after this method is called, Task.cancelled() will
207 not return True (unless the task was already cancelled). A
208 task will be marked as cancelled when the wrapped coroutine
209 terminates with a CancelledError exception (even if cancel()
210 was not called).
211 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700212 if self.done():
213 return False
214 if self._fut_waiter is not None:
215 if self._fut_waiter.cancel():
216 # Leave self._fut_waiter; it may be a Task that
217 # catches and ignores the cancellation so we may have
218 # to cancel it again later.
219 return True
220 # It must be the case that self._step is already scheduled.
221 self._must_cancel = True
222 return True
223
Yury Selivanovd59bba82015-11-20 12:41:03 -0500224 def _step(self, exc=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700225 assert not self.done(), \
Yury Selivanovd59bba82015-11-20 12:41:03 -0500226 '_step(): already done: {!r}, {!r}'.format(self, exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700227 if self._must_cancel:
228 if not isinstance(exc, futures.CancelledError):
229 exc = futures.CancelledError()
230 self._must_cancel = False
231 coro = self._coro
232 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800233
234 self.__class__._current_tasks[self._loop] = self
Yury Selivanovd59bba82015-11-20 12:41:03 -0500235 # Call either coro.throw(exc) or coro.send(None).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700236 try:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500237 if exc is None:
238 # We use the `send` method directly, because coroutines
239 # don't have `__iter__` and `__next__` methods.
240 result = coro.send(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700241 else:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500242 result = coro.throw(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700243 except StopIteration as exc:
244 self.set_result(exc.value)
245 except futures.CancelledError as exc:
246 super().cancel() # I.e., Future.cancel(self).
247 except Exception as exc:
248 self.set_exception(exc)
249 except BaseException as exc:
250 self.set_exception(exc)
251 raise
252 else:
253 if isinstance(result, futures.Future):
254 # Yielded Future must come from Future.__iter__().
Yury Selivanov0ac3a0c2015-12-11 11:33:59 -0500255 if result._loop is not self._loop:
256 self._loop.call_soon(
257 self._step,
258 RuntimeError(
259 'Task {!r} got Future {!r} attached to a '
260 'different loop'.format(self, result)))
261 elif result._blocking:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700262 result._blocking = False
263 result.add_done_callback(self._wakeup)
264 self._fut_waiter = result
Yury Selivanov4c0a09a2015-08-02 16:49:31 -0400265 if self._must_cancel:
266 if self._fut_waiter.cancel():
267 self._must_cancel = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700268 else:
269 self._loop.call_soon(
Yury Selivanovd59bba82015-11-20 12:41:03 -0500270 self._step,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700271 RuntimeError(
272 'yield was used instead of yield from '
273 'in task {!r} with {!r}'.format(self, result)))
274 elif result is None:
275 # Bare yield relinquishes control for one event loop iteration.
276 self._loop.call_soon(self._step)
277 elif inspect.isgenerator(result):
278 # Yielding a generator is just wrong.
279 self._loop.call_soon(
Yury Selivanovd59bba82015-11-20 12:41:03 -0500280 self._step,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700281 RuntimeError(
282 'yield was used instead of yield from for '
283 'generator in task {!r} with {}'.format(
284 self, result)))
285 else:
286 # Yielding something else is an error.
287 self._loop.call_soon(
Yury Selivanovd59bba82015-11-20 12:41:03 -0500288 self._step,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700289 RuntimeError(
290 'Task got bad yield: {!r}'.format(result)))
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800291 finally:
292 self.__class__._current_tasks.pop(self._loop)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100293 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700294
295 def _wakeup(self, future):
296 try:
Yury Selivanova4afc482015-11-16 15:12:10 -0500297 future.result()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700298 except Exception as exc:
299 # This may also be a cancellation.
Yury Selivanovd59bba82015-11-20 12:41:03 -0500300 self._step(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700301 else:
Yury Selivanova4afc482015-11-16 15:12:10 -0500302 # Don't pass the value of `future.result()` explicitly,
303 # as `Future.__iter__` and `Future.__await__` don't need it.
304 # If we call `_step(value, None)` instead of `_step()`,
305 # Python eval loop would use `.send(value)` method call,
306 # instead of `__next__()`, which is slower for futures
307 # that return non-generator iterators from their `__iter__`.
308 self._step()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700309 self = None # Needed to break cycles when an exception occurs.
310
311
312# wait() and as_completed() similar to those in PEP 3148.
313
314FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
315FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
316ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
317
318
319@coroutine
320def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
321 """Wait for the Futures and coroutines given by fs to complete.
322
Victor Stinnerdb74d982014-06-10 11:16:05 +0200323 The sequence futures must not be empty.
324
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700325 Coroutines will be wrapped in Tasks.
326
327 Returns two sets of Future: (done, pending).
328
329 Usage:
330
331 done, pending = yield from asyncio.wait(fs)
332
333 Note: This does not raise TimeoutError! Futures that aren't done
334 when the timeout occurs are returned in the second set.
335 """
Victor Stinnerf951d282014-06-29 00:46:45 +0200336 if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100337 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700338 if not fs:
339 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200340 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
341 raise ValueError('Invalid return_when value: {}'.format(return_when))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700342
343 if loop is None:
344 loop = events.get_event_loop()
345
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400346 fs = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700347
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700348 return (yield from _wait(fs, timeout, return_when, loop))
349
350
Victor Stinner59e08022014-08-28 11:19:25 +0200351def _release_waiter(waiter, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700352 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200353 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700354
355
356@coroutine
357def wait_for(fut, timeout, *, loop=None):
358 """Wait for the single Future or coroutine to complete, with timeout.
359
360 Coroutine will be wrapped in Task.
361
Victor Stinner421e49b2014-01-23 17:40:59 +0100362 Returns result of the Future or coroutine. When a timeout occurs,
363 it cancels the task and raises TimeoutError. To avoid the task
364 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700365
Victor Stinner922bc2c2015-01-15 16:29:10 +0100366 If the wait is cancelled, the task is also cancelled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700367
Victor Stinner922bc2c2015-01-15 16:29:10 +0100368 This function is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700369 """
370 if loop is None:
371 loop = events.get_event_loop()
372
Guido van Rossum48c66c32014-01-29 14:30:38 -0800373 if timeout is None:
374 return (yield from fut)
375
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700376 waiter = futures.Future(loop=loop)
Victor Stinner59e08022014-08-28 11:19:25 +0200377 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
378 cb = functools.partial(_release_waiter, waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700379
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400380 fut = ensure_future(fut, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700381 fut.add_done_callback(cb)
382
383 try:
Victor Stinner59e08022014-08-28 11:19:25 +0200384 # wait until the future completes or the timeout
Victor Stinner922bc2c2015-01-15 16:29:10 +0100385 try:
386 yield from waiter
387 except futures.CancelledError:
388 fut.remove_done_callback(cb)
389 fut.cancel()
390 raise
Victor Stinner59e08022014-08-28 11:19:25 +0200391
392 if fut.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700393 return fut.result()
394 else:
395 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100396 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700397 raise futures.TimeoutError()
398 finally:
399 timeout_handle.cancel()
400
401
402@coroutine
403def _wait(fs, timeout, return_when, loop):
Victor Stinner2ba8ece2016-04-01 21:39:09 +0200404 """Internal helper for wait() and wait_for().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700405
406 The fs argument must be a collection of Futures.
407 """
408 assert fs, 'Set of Futures is empty.'
409 waiter = futures.Future(loop=loop)
410 timeout_handle = None
411 if timeout is not None:
412 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
413 counter = len(fs)
414
415 def _on_completion(f):
416 nonlocal counter
417 counter -= 1
418 if (counter <= 0 or
419 return_when == FIRST_COMPLETED or
420 return_when == FIRST_EXCEPTION and (not f.cancelled() and
421 f.exception() is not None)):
422 if timeout_handle is not None:
423 timeout_handle.cancel()
424 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200425 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700426
427 for f in fs:
428 f.add_done_callback(_on_completion)
429
430 try:
431 yield from waiter
432 finally:
433 if timeout_handle is not None:
434 timeout_handle.cancel()
435
436 done, pending = set(), set()
437 for f in fs:
438 f.remove_done_callback(_on_completion)
439 if f.done():
440 done.add(f)
441 else:
442 pending.add(f)
443 return done, pending
444
445
446# This is *not* a @coroutine! It is just an iterator (yielding Futures).
447def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800448 """Return an iterator whose values are coroutines.
449
450 When waiting for the yielded coroutines you'll get the results (or
451 exceptions!) of the original Futures (or coroutines), in the order
452 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700453
454 This differs from PEP 3148; the proper way to use this is:
455
456 for f in as_completed(fs):
457 result = yield from f # The 'yield from' may raise.
458 # Use result.
459
Guido van Rossumb58f0532014-02-12 17:58:19 -0800460 If a timeout is specified, the 'yield from' will raise
461 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700462
463 Note: The futures 'f' are not necessarily members of fs.
464 """
Victor Stinnerf951d282014-06-29 00:46:45 +0200465 if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100466 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700467 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400468 todo = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800469 from .queues import Queue # Import here to avoid circular import problem.
470 done = Queue(loop=loop)
471 timeout_handle = None
472
473 def _on_timeout():
474 for f in todo:
475 f.remove_done_callback(_on_completion)
476 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
477 todo.clear() # Can't do todo.remove(f) in the loop.
478
479 def _on_completion(f):
480 if not todo:
481 return # _on_timeout() was here first.
482 todo.remove(f)
483 done.put_nowait(f)
484 if not todo and timeout_handle is not None:
485 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700486
487 @coroutine
488 def _wait_for_one():
Guido van Rossumb58f0532014-02-12 17:58:19 -0800489 f = yield from done.get()
490 if f is None:
491 # Dummy value from _on_timeout().
492 raise futures.TimeoutError
493 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700494
Guido van Rossumb58f0532014-02-12 17:58:19 -0800495 for f in todo:
496 f.add_done_callback(_on_completion)
497 if todo and timeout is not None:
498 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700499 for _ in range(len(todo)):
500 yield _wait_for_one()
501
502
503@coroutine
504def sleep(delay, result=None, *, loop=None):
505 """Coroutine that completes after a given time (in seconds)."""
Yury Selivanovade04122015-11-05 14:29:04 -0500506 if delay == 0:
507 yield
508 return result
509
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700510 future = futures.Future(loop=loop)
Victor Stinnera9acbe82014-07-05 15:29:41 +0200511 h = future._loop.call_later(delay,
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500512 futures._set_result_unless_cancelled,
513 future, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700514 try:
515 return (yield from future)
516 finally:
517 h.cancel()
518
519
520def async(coro_or_future, *, loop=None):
521 """Wrap a coroutine in a future.
522
523 If the argument is a Future, it is returned directly.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400524
525 This function is deprecated in 3.5. Use asyncio.ensure_future() instead.
526 """
527
528 warnings.warn("asyncio.async() function is deprecated, use ensure_future()",
529 DeprecationWarning)
530
531 return ensure_future(coro_or_future, loop=loop)
532
533
534def ensure_future(coro_or_future, *, loop=None):
Yury Selivanov620279b2015-10-02 15:00:19 -0400535 """Wrap a coroutine or an awaitable in a future.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400536
537 If the argument is a Future, it is returned directly.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700538 """
539 if isinstance(coro_or_future, futures.Future):
540 if loop is not None and loop is not coro_or_future._loop:
541 raise ValueError('loop argument must agree with Future')
542 return coro_or_future
Victor Stinnerf951d282014-06-29 00:46:45 +0200543 elif coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200544 if loop is None:
545 loop = events.get_event_loop()
546 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200547 if task._source_traceback:
548 del task._source_traceback[-1]
549 return task
Yury Selivanov620279b2015-10-02 15:00:19 -0400550 elif compat.PY35 and inspect.isawaitable(coro_or_future):
551 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700552 else:
Yury Selivanov620279b2015-10-02 15:00:19 -0400553 raise TypeError('A Future, a coroutine or an awaitable is required')
554
555
556@coroutine
557def _wrap_awaitable(awaitable):
558 """Helper for asyncio.ensure_future().
559
560 Wraps awaitable (an object with __await__) into a coroutine
561 that will later be wrapped in a Task by ensure_future().
562 """
563 return (yield from awaitable.__await__())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700564
565
566class _GatheringFuture(futures.Future):
567 """Helper for gather().
568
569 This overrides cancel() to cancel all the children and act more
570 like Task.cancel(), which doesn't immediately mark itself as
571 cancelled.
572 """
573
574 def __init__(self, children, *, loop=None):
575 super().__init__(loop=loop)
576 self._children = children
577
578 def cancel(self):
579 if self.done():
580 return False
581 for child in self._children:
582 child.cancel()
583 return True
584
585
586def gather(*coros_or_futures, loop=None, return_exceptions=False):
587 """Return a future aggregating results from the given coroutines
588 or futures.
589
590 All futures must share the same event loop. If all the tasks are
591 done successfully, the returned future's result is the list of
592 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500593 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700594 exceptions in the tasks are treated the same as successful
595 results, and gathered in the result list; otherwise, the first
596 raised exception will be immediately propagated to the returned
597 future.
598
599 Cancellation: if the outer Future is cancelled, all children (that
600 have not completed yet) are also cancelled. If any child is
601 cancelled, this is treated as if it raised CancelledError --
602 the outer Future is *not* cancelled in this case. (This is to
603 prevent the cancellation of one child to cause other children to
604 be cancelled.)
605 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200606 if not coros_or_futures:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700607 outer = futures.Future(loop=loop)
608 outer.set_result([])
609 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200610
611 arg_to_fut = {}
612 for arg in set(coros_or_futures):
613 if not isinstance(arg, futures.Future):
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400614 fut = ensure_future(arg, loop=loop)
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200615 if loop is None:
616 loop = fut._loop
617 # The caller cannot control this future, the "destroy pending task"
618 # warning should not be emitted.
619 fut._log_destroy_pending = False
620 else:
621 fut = arg
622 if loop is None:
623 loop = fut._loop
624 elif fut._loop is not loop:
625 raise ValueError("futures are tied to different event loops")
626 arg_to_fut[arg] = fut
627
628 children = [arg_to_fut[arg] for arg in coros_or_futures]
629 nchildren = len(children)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700630 outer = _GatheringFuture(children, loop=loop)
631 nfinished = 0
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200632 results = [None] * nchildren
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700633
634 def _done_callback(i, fut):
635 nonlocal nfinished
Victor Stinner3531d902015-01-09 01:42:52 +0100636 if outer.done():
637 if not fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700638 # Mark exception retrieved.
639 fut.exception()
640 return
Victor Stinner3531d902015-01-09 01:42:52 +0100641
Victor Stinner29342622015-01-29 14:15:19 +0100642 if fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700643 res = futures.CancelledError()
644 if not return_exceptions:
645 outer.set_exception(res)
646 return
647 elif fut._exception is not None:
648 res = fut.exception() # Mark exception retrieved.
649 if not return_exceptions:
650 outer.set_exception(res)
651 return
652 else:
653 res = fut._result
654 results[i] = res
655 nfinished += 1
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200656 if nfinished == nchildren:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700657 outer.set_result(results)
658
659 for i, fut in enumerate(children):
660 fut.add_done_callback(functools.partial(_done_callback, i))
661 return outer
662
663
664def shield(arg, *, loop=None):
665 """Wait for a future, shielding it from cancellation.
666
667 The statement
668
669 res = yield from shield(something())
670
671 is exactly equivalent to the statement
672
673 res = yield from something()
674
675 *except* that if the coroutine containing it is cancelled, the
676 task running in something() is not cancelled. From the POV of
677 something(), the cancellation did not happen. But its caller is
678 still cancelled, so the yield-from expression still raises
679 CancelledError. Note: If something() is cancelled by other means
680 this will still cancel shield().
681
682 If you want to completely ignore cancellation (not recommended)
683 you can combine shield() with a try/except clause, as follows:
684
685 try:
686 res = yield from shield(something())
687 except CancelledError:
688 res = None
689 """
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400690 inner = ensure_future(arg, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700691 if inner.done():
692 # Shortcut.
693 return inner
694 loop = inner._loop
695 outer = futures.Future(loop=loop)
696
697 def _done_callback(inner):
698 if outer.cancelled():
Victor Stinner3531d902015-01-09 01:42:52 +0100699 if not inner.cancelled():
700 # Mark inner's result as retrieved.
701 inner.exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700702 return
Victor Stinner3531d902015-01-09 01:42:52 +0100703
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700704 if inner.cancelled():
705 outer.cancel()
706 else:
707 exc = inner.exception()
708 if exc is not None:
709 outer.set_exception(exc)
710 else:
711 outer.set_result(inner.result())
712
713 inner.add_done_callback(_done_callback)
714 return outer
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700715
716
717def run_coroutine_threadsafe(coro, loop):
718 """Submit a coroutine object to a given event loop.
719
720 Return a concurrent.futures.Future to access the result.
721 """
722 if not coroutines.iscoroutine(coro):
723 raise TypeError('A coroutine object is required')
724 future = concurrent.futures.Future()
725
726 def callback():
Guido van Rossum601953b2015-10-05 16:20:00 -0700727 try:
728 futures._chain_future(ensure_future(coro, loop=loop), future)
729 except Exception as exc:
730 if future.set_running_or_notify_cancel():
731 future.set_exception(exc)
732 raise
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700733
734 loop.call_soon_threadsafe(callback)
735 return future
Andrew Svetlovc07b16b2016-01-11 08:42:49 +0200736
737
738def timeout(timeout, *, loop=None):
739 """A factory which produce a context manager with timeout.
740
741 Useful in cases when you want to apply timeout logic around block
742 of code or in cases when asyncio.wait_for is not suitable.
743
744 For example:
745
746 >>> with asyncio.timeout(0.001):
Andrew Svetlov9d976fa2016-01-11 12:25:23 +0200747 ... yield from coro()
Andrew Svetlovc07b16b2016-01-11 08:42:49 +0200748
749
Victor Stinner2ba8ece2016-04-01 21:39:09 +0200750 timeout: timeout value in seconds or None to disable timeout logic
Andrew Svetlovc07b16b2016-01-11 08:42:49 +0200751 loop: asyncio compatible event loop
752 """
753 if loop is None:
754 loop = events.get_event_loop()
755 return _Timeout(timeout, loop=loop)
756
757
758class _Timeout:
759 def __init__(self, timeout, *, loop):
760 self._timeout = timeout
761 self._loop = loop
762 self._task = None
763 self._cancelled = False
764 self._cancel_handler = None
765
766 def __enter__(self):
767 self._task = Task.current_task(loop=self._loop)
768 if self._task is None:
769 raise RuntimeError('Timeout context manager should be used '
770 'inside a task')
Victor Stinner2ba8ece2016-04-01 21:39:09 +0200771 if self._timeout is not None:
772 self._cancel_handler = self._loop.call_later(
773 self._timeout, self._cancel_task)
Andrew Svetlovc07b16b2016-01-11 08:42:49 +0200774 return self
775
776 def __exit__(self, exc_type, exc_val, exc_tb):
777 if exc_type is futures.CancelledError and self._cancelled:
778 self._cancel_handler = None
779 self._task = None
780 raise futures.TimeoutError
Victor Stinner2ba8ece2016-04-01 21:39:09 +0200781 if self._timeout is not None:
782 self._cancel_handler.cancel()
783 self._cancel_handler = None
Andrew Svetlovc07b16b2016-01-11 08:42:49 +0200784 self._task = None
785
786 def _cancel_task(self):
787 self._cancelled = self._task.cancel()