blob: 8852aa5ad2a2eb1e3a2760585048f626005afca4 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
Victor Stinnerf951d282014-06-29 00:46:45 +02003__all__ = ['Task',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
5 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
Guido van Rossum841d9ee2015-10-03 08:31:42 -07006 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007 ]
8
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009import concurrent.futures
10import functools
11import inspect
12import linecache
13import traceback
Yury Selivanov59eb9a42015-05-11 14:48:38 -040014import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015import weakref
16
Victor Stinner71080fc2015-07-25 02:23:21 +020017from . import compat
Victor Stinnerf951d282014-06-29 00:46:45 +020018from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019from . import events
20from . import futures
Victor Stinnerf951d282014-06-29 00:46:45 +020021from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070024class Task(futures.Future):
25 """A coroutine wrapped in a Future."""
26
27 # An important invariant maintained while a Task not done:
28 #
29 # - Either _fut_waiter is None, and _step() is scheduled;
30 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
31 #
32 # The only transition from the latter to the former is through
33 # _wakeup(). When _fut_waiter is not None, one of its callbacks
34 # must be _wakeup().
35
36 # Weak set containing all tasks alive.
37 _all_tasks = weakref.WeakSet()
38
Guido van Rossum1a605ed2013-12-06 12:57:40 -080039 # Dictionary containing tasks that are currently active in
40 # all running event loops. {EventLoop: Task}
41 _current_tasks = {}
42
Victor Stinnerfe22e092014-12-04 23:00:13 +010043 # If False, don't log a message if the task is destroyed whereas its
44 # status is still pending
45 _log_destroy_pending = True
46
Guido van Rossum1a605ed2013-12-06 12:57:40 -080047 @classmethod
48 def current_task(cls, loop=None):
49 """Return the currently running task in an event loop or None.
50
51 By default the current task for the current event loop is returned.
52
53 None is returned when called not in the context of a Task.
54 """
55 if loop is None:
56 loop = events.get_event_loop()
57 return cls._current_tasks.get(loop)
58
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070059 @classmethod
60 def all_tasks(cls, loop=None):
61 """Return a set of all tasks for an event loop.
62
63 By default all tasks for the current event loop are returned.
64 """
65 if loop is None:
66 loop = events.get_event_loop()
67 return {t for t in cls._all_tasks if t._loop is loop}
68
69 def __init__(self, coro, *, loop=None):
Victor Stinner15cc6782015-01-09 00:09:10 +010070 assert coroutines.iscoroutine(coro), repr(coro)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070071 super().__init__(loop=loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +020072 if self._source_traceback:
73 del self._source_traceback[-1]
Yury Selivanov1ad08a52015-05-28 10:52:19 -040074 self._coro = coro
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070075 self._fut_waiter = None
76 self._must_cancel = False
77 self._loop.call_soon(self._step)
78 self.__class__._all_tasks.add(self)
79
R David Murray8e069d52014-09-24 13:13:45 -040080 # On Python 3.3 or older, objects with a destructor that are part of a
81 # reference cycle are never destroyed. That's not the case any more on
82 # Python 3.4 thanks to the PEP 442.
Victor Stinner71080fc2015-07-25 02:23:21 +020083 if compat.PY34:
Victor Stinnera02f81f2014-06-24 22:37:53 +020084 def __del__(self):
Victor Stinner98b63912014-06-30 14:51:04 +020085 if self._state == futures._PENDING and self._log_destroy_pending:
Victor Stinner80f53aa2014-06-27 13:52:20 +020086 context = {
Victor Stinnera02f81f2014-06-24 22:37:53 +020087 'task': self,
88 'message': 'Task was destroyed but it is pending!',
Victor Stinner80f53aa2014-06-27 13:52:20 +020089 }
90 if self._source_traceback:
91 context['source_traceback'] = self._source_traceback
92 self._loop.call_exception_handler(context)
Victor Stinnera02f81f2014-06-24 22:37:53 +020093 futures.Future.__del__(self)
94
Victor Stinner313a9802014-07-29 12:58:23 +020095 def _repr_info(self):
96 info = super()._repr_info()
97
Victor Stinner975735f2014-06-25 21:41:58 +020098 if self._must_cancel:
Victor Stinner313a9802014-07-29 12:58:23 +020099 # replace status
100 info[0] = 'cancelling'
Victor Stinner975735f2014-06-25 21:41:58 +0200101
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200102 coro = coroutines._format_coroutine(self._coro)
Victor Stinner313a9802014-07-29 12:58:23 +0200103 info.insert(1, 'coro=<%s>' % coro)
Victor Stinner975735f2014-06-25 21:41:58 +0200104
Victor Stinner2dba23a2014-07-03 00:59:00 +0200105 if self._fut_waiter is not None:
Victor Stinner313a9802014-07-29 12:58:23 +0200106 info.insert(2, 'wait_for=%r' % self._fut_waiter)
107 return info
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700108
109 def get_stack(self, *, limit=None):
110 """Return the list of stack frames for this task's coroutine.
111
Victor Stinnerd87de832014-12-02 17:57:04 +0100112 If the coroutine is not done, this returns the stack where it is
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700113 suspended. If the coroutine has completed successfully or was
114 cancelled, this returns an empty list. If the coroutine was
115 terminated by an exception, this returns the list of traceback
116 frames.
117
118 The frames are always ordered from oldest to newest.
119
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500120 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700121 return; by default all available frames are returned. Its
122 meaning differs depending on whether a stack or a traceback is
123 returned: the newest frames of a stack are returned, but the
124 oldest frames of a traceback are returned. (This matches the
125 behavior of the traceback module.)
126
127 For reasons beyond our control, only one stack frame is
128 returned for a suspended coroutine.
129 """
130 frames = []
Yury Selivanov23398332015-08-14 15:30:59 -0400131 try:
132 # 'async def' coroutines
133 f = self._coro.cr_frame
134 except AttributeError:
135 f = self._coro.gi_frame
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700136 if f is not None:
137 while f is not None:
138 if limit is not None:
139 if limit <= 0:
140 break
141 limit -= 1
142 frames.append(f)
143 f = f.f_back
144 frames.reverse()
145 elif self._exception is not None:
146 tb = self._exception.__traceback__
147 while tb is not None:
148 if limit is not None:
149 if limit <= 0:
150 break
151 limit -= 1
152 frames.append(tb.tb_frame)
153 tb = tb.tb_next
154 return frames
155
156 def print_stack(self, *, limit=None, file=None):
157 """Print the stack or traceback for this task's coroutine.
158
159 This produces output similar to that of the traceback module,
160 for the frames retrieved by get_stack(). The limit argument
161 is passed to get_stack(). The file argument is an I/O stream
R David Murray8e069d52014-09-24 13:13:45 -0400162 to which the output is written; by default output is written
163 to sys.stderr.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700164 """
165 extracted_list = []
166 checked = set()
167 for f in self.get_stack(limit=limit):
168 lineno = f.f_lineno
169 co = f.f_code
170 filename = co.co_filename
171 name = co.co_name
172 if filename not in checked:
173 checked.add(filename)
174 linecache.checkcache(filename)
175 line = linecache.getline(filename, lineno, f.f_globals)
176 extracted_list.append((filename, lineno, name, line))
177 exc = self._exception
178 if not extracted_list:
179 print('No stack for %r' % self, file=file)
180 elif exc is not None:
181 print('Traceback for %r (most recent call last):' % self,
182 file=file)
183 else:
184 print('Stack for %r (most recent call last):' % self,
185 file=file)
186 traceback.print_list(extracted_list, file=file)
187 if exc is not None:
188 for line in traceback.format_exception_only(exc.__class__, exc):
189 print(line, file=file, end='')
190
191 def cancel(self):
R David Murray8e069d52014-09-24 13:13:45 -0400192 """Request that this task cancel itself.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200193
Victor Stinner8d213572014-06-02 23:06:46 +0200194 This arranges for a CancelledError to be thrown into the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200195 wrapped coroutine on the next cycle through the event loop.
196 The coroutine then has a chance to clean up or even deny
197 the request using try/except/finally.
198
R David Murray8e069d52014-09-24 13:13:45 -0400199 Unlike Future.cancel, this does not guarantee that the
Victor Stinner4bd652a2014-04-07 11:18:06 +0200200 task will be cancelled: the exception might be caught and
R David Murray8e069d52014-09-24 13:13:45 -0400201 acted upon, delaying cancellation of the task or preventing
202 cancellation completely. The task may also return a value or
203 raise a different exception.
Victor Stinner4bd652a2014-04-07 11:18:06 +0200204
205 Immediately after this method is called, Task.cancelled() will
206 not return True (unless the task was already cancelled). A
207 task will be marked as cancelled when the wrapped coroutine
208 terminates with a CancelledError exception (even if cancel()
209 was not called).
210 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700211 if self.done():
212 return False
213 if self._fut_waiter is not None:
214 if self._fut_waiter.cancel():
215 # Leave self._fut_waiter; it may be a Task that
216 # catches and ignores the cancellation so we may have
217 # to cancel it again later.
218 return True
219 # It must be the case that self._step is already scheduled.
220 self._must_cancel = True
221 return True
222
Yury Selivanovd59bba82015-11-20 12:41:03 -0500223 def _step(self, exc=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700224 assert not self.done(), \
Yury Selivanovd59bba82015-11-20 12:41:03 -0500225 '_step(): already done: {!r}, {!r}'.format(self, exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700226 if self._must_cancel:
227 if not isinstance(exc, futures.CancelledError):
228 exc = futures.CancelledError()
229 self._must_cancel = False
230 coro = self._coro
231 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800232
233 self.__class__._current_tasks[self._loop] = self
Yury Selivanovd59bba82015-11-20 12:41:03 -0500234 # Call either coro.throw(exc) or coro.send(None).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700235 try:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500236 if exc is None:
237 # We use the `send` method directly, because coroutines
238 # don't have `__iter__` and `__next__` methods.
239 result = coro.send(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700240 else:
Yury Selivanovd59bba82015-11-20 12:41:03 -0500241 result = coro.throw(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700242 except StopIteration as exc:
243 self.set_result(exc.value)
Yury Selivanov4145c832016-10-09 12:19:12 -0400244 except futures.CancelledError:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700245 super().cancel() # I.e., Future.cancel(self).
246 except Exception as exc:
247 self.set_exception(exc)
248 except BaseException as exc:
249 self.set_exception(exc)
250 raise
251 else:
Guido van Rossum1140a032016-09-09 12:54:54 -0700252 blocking = getattr(result, '_asyncio_future_blocking', None)
253 if blocking is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700254 # Yielded Future must come from Future.__iter__().
Yury Selivanov0ac3a0c2015-12-11 11:33:59 -0500255 if result._loop is not self._loop:
256 self._loop.call_soon(
257 self._step,
258 RuntimeError(
259 'Task {!r} got Future {!r} attached to a '
260 'different loop'.format(self, result)))
Guido van Rossum1140a032016-09-09 12:54:54 -0700261 elif blocking:
Yury Selivanov4145c832016-10-09 12:19:12 -0400262 if result is self:
263 self._loop.call_soon(
264 self._step,
265 RuntimeError(
266 'Task cannot await on itself: {!r}'.format(
267 self)))
268 else:
269 result._asyncio_future_blocking = False
270 result.add_done_callback(self._wakeup)
271 self._fut_waiter = result
272 if self._must_cancel:
273 if self._fut_waiter.cancel():
274 self._must_cancel = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700275 else:
276 self._loop.call_soon(
Yury Selivanovd59bba82015-11-20 12:41:03 -0500277 self._step,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700278 RuntimeError(
279 'yield was used instead of yield from '
280 'in task {!r} with {!r}'.format(self, result)))
281 elif result is None:
282 # Bare yield relinquishes control for one event loop iteration.
283 self._loop.call_soon(self._step)
284 elif inspect.isgenerator(result):
285 # Yielding a generator is just wrong.
286 self._loop.call_soon(
Yury Selivanovd59bba82015-11-20 12:41:03 -0500287 self._step,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700288 RuntimeError(
289 'yield was used instead of yield from for '
290 'generator in task {!r} with {}'.format(
291 self, result)))
292 else:
293 # Yielding something else is an error.
294 self._loop.call_soon(
Yury Selivanovd59bba82015-11-20 12:41:03 -0500295 self._step,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700296 RuntimeError(
297 'Task got bad yield: {!r}'.format(result)))
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800298 finally:
299 self.__class__._current_tasks.pop(self._loop)
Victor Stinnerd74ac822014-03-04 23:07:08 +0100300 self = None # Needed to break cycles when an exception occurs.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700301
302 def _wakeup(self, future):
303 try:
Yury Selivanova4afc482015-11-16 15:12:10 -0500304 future.result()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700305 except Exception as exc:
306 # This may also be a cancellation.
Yury Selivanovd59bba82015-11-20 12:41:03 -0500307 self._step(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700308 else:
Yury Selivanova4afc482015-11-16 15:12:10 -0500309 # Don't pass the value of `future.result()` explicitly,
310 # as `Future.__iter__` and `Future.__await__` don't need it.
311 # If we call `_step(value, None)` instead of `_step()`,
312 # Python eval loop would use `.send(value)` method call,
313 # instead of `__next__()`, which is slower for futures
314 # that return non-generator iterators from their `__iter__`.
315 self._step()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700316 self = None # Needed to break cycles when an exception occurs.
317
318
319# wait() and as_completed() similar to those in PEP 3148.
320
321FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
322FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
323ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
324
325
326@coroutine
327def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
328 """Wait for the Futures and coroutines given by fs to complete.
329
Victor Stinnerdb74d982014-06-10 11:16:05 +0200330 The sequence futures must not be empty.
331
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700332 Coroutines will be wrapped in Tasks.
333
334 Returns two sets of Future: (done, pending).
335
336 Usage:
337
338 done, pending = yield from asyncio.wait(fs)
339
340 Note: This does not raise TimeoutError! Futures that aren't done
341 when the timeout occurs are returned in the second set.
342 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700343 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100344 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700345 if not fs:
346 raise ValueError('Set of coroutines/Futures is empty.')
Victor Stinnere931f7b2014-07-16 18:50:39 +0200347 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
348 raise ValueError('Invalid return_when value: {}'.format(return_when))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700349
350 if loop is None:
351 loop = events.get_event_loop()
352
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400353 fs = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700354
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700355 return (yield from _wait(fs, timeout, return_when, loop))
356
357
Victor Stinner59e08022014-08-28 11:19:25 +0200358def _release_waiter(waiter, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700359 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200360 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700361
362
363@coroutine
364def wait_for(fut, timeout, *, loop=None):
365 """Wait for the single Future or coroutine to complete, with timeout.
366
367 Coroutine will be wrapped in Task.
368
Victor Stinner421e49b2014-01-23 17:40:59 +0100369 Returns result of the Future or coroutine. When a timeout occurs,
370 it cancels the task and raises TimeoutError. To avoid the task
371 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700372
Victor Stinner922bc2c2015-01-15 16:29:10 +0100373 If the wait is cancelled, the task is also cancelled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700374
Victor Stinner922bc2c2015-01-15 16:29:10 +0100375 This function is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700376 """
377 if loop is None:
378 loop = events.get_event_loop()
379
Guido van Rossum48c66c32014-01-29 14:30:38 -0800380 if timeout is None:
381 return (yield from fut)
382
Yury Selivanov7661db62016-05-16 15:38:39 -0400383 waiter = loop.create_future()
Victor Stinner59e08022014-08-28 11:19:25 +0200384 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
385 cb = functools.partial(_release_waiter, waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700386
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400387 fut = ensure_future(fut, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700388 fut.add_done_callback(cb)
389
390 try:
Victor Stinner59e08022014-08-28 11:19:25 +0200391 # wait until the future completes or the timeout
Victor Stinner922bc2c2015-01-15 16:29:10 +0100392 try:
393 yield from waiter
394 except futures.CancelledError:
395 fut.remove_done_callback(cb)
396 fut.cancel()
397 raise
Victor Stinner59e08022014-08-28 11:19:25 +0200398
399 if fut.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700400 return fut.result()
401 else:
402 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100403 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700404 raise futures.TimeoutError()
405 finally:
406 timeout_handle.cancel()
407
408
409@coroutine
410def _wait(fs, timeout, return_when, loop):
Victor Stinner2ba8ece2016-04-01 21:39:09 +0200411 """Internal helper for wait() and wait_for().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700412
413 The fs argument must be a collection of Futures.
414 """
415 assert fs, 'Set of Futures is empty.'
Yury Selivanov7661db62016-05-16 15:38:39 -0400416 waiter = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700417 timeout_handle = None
418 if timeout is not None:
419 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
420 counter = len(fs)
421
422 def _on_completion(f):
423 nonlocal counter
424 counter -= 1
425 if (counter <= 0 or
426 return_when == FIRST_COMPLETED or
427 return_when == FIRST_EXCEPTION and (not f.cancelled() and
428 f.exception() is not None)):
429 if timeout_handle is not None:
430 timeout_handle.cancel()
431 if not waiter.done():
Victor Stinner59e08022014-08-28 11:19:25 +0200432 waiter.set_result(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700433
434 for f in fs:
435 f.add_done_callback(_on_completion)
436
437 try:
438 yield from waiter
439 finally:
440 if timeout_handle is not None:
441 timeout_handle.cancel()
442
443 done, pending = set(), set()
444 for f in fs:
445 f.remove_done_callback(_on_completion)
446 if f.done():
447 done.add(f)
448 else:
449 pending.add(f)
450 return done, pending
451
452
453# This is *not* a @coroutine! It is just an iterator (yielding Futures).
454def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossumb58f0532014-02-12 17:58:19 -0800455 """Return an iterator whose values are coroutines.
456
457 When waiting for the yielded coroutines you'll get the results (or
458 exceptions!) of the original Futures (or coroutines), in the order
459 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700460
461 This differs from PEP 3148; the proper way to use this is:
462
463 for f in as_completed(fs):
464 result = yield from f # The 'yield from' may raise.
465 # Use result.
466
Guido van Rossumb58f0532014-02-12 17:58:19 -0800467 If a timeout is specified, the 'yield from' will raise
468 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700469
470 Note: The futures 'f' are not necessarily members of fs.
471 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700472 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
Victor Stinnereb748762014-02-11 11:54:08 +0100473 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700474 loop = loop if loop is not None else events.get_event_loop()
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400475 todo = {ensure_future(f, loop=loop) for f in set(fs)}
Guido van Rossumb58f0532014-02-12 17:58:19 -0800476 from .queues import Queue # Import here to avoid circular import problem.
477 done = Queue(loop=loop)
478 timeout_handle = None
479
480 def _on_timeout():
481 for f in todo:
482 f.remove_done_callback(_on_completion)
483 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
484 todo.clear() # Can't do todo.remove(f) in the loop.
485
486 def _on_completion(f):
487 if not todo:
488 return # _on_timeout() was here first.
489 todo.remove(f)
490 done.put_nowait(f)
491 if not todo and timeout_handle is not None:
492 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700493
494 @coroutine
495 def _wait_for_one():
Guido van Rossumb58f0532014-02-12 17:58:19 -0800496 f = yield from done.get()
497 if f is None:
498 # Dummy value from _on_timeout().
499 raise futures.TimeoutError
500 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700501
Guido van Rossumb58f0532014-02-12 17:58:19 -0800502 for f in todo:
503 f.add_done_callback(_on_completion)
504 if todo and timeout is not None:
505 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700506 for _ in range(len(todo)):
507 yield _wait_for_one()
508
509
510@coroutine
511def sleep(delay, result=None, *, loop=None):
512 """Coroutine that completes after a given time (in seconds)."""
Yury Selivanovade04122015-11-05 14:29:04 -0500513 if delay == 0:
514 yield
515 return result
516
Yury Selivanov7661db62016-05-16 15:38:39 -0400517 if loop is None:
518 loop = events.get_event_loop()
519 future = loop.create_future()
Victor Stinnera9acbe82014-07-05 15:29:41 +0200520 h = future._loop.call_later(delay,
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500521 futures._set_result_unless_cancelled,
522 future, result)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700523 try:
524 return (yield from future)
525 finally:
526 h.cancel()
527
528
Yury Selivanov4357cf62016-09-15 13:49:08 -0400529def async_(coro_or_future, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700530 """Wrap a coroutine in a future.
531
532 If the argument is a Future, it is returned directly.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400533
534 This function is deprecated in 3.5. Use asyncio.ensure_future() instead.
535 """
536
537 warnings.warn("asyncio.async() function is deprecated, use ensure_future()",
538 DeprecationWarning)
539
540 return ensure_future(coro_or_future, loop=loop)
541
Yury Selivanov4357cf62016-09-15 13:49:08 -0400542# Silence DeprecationWarning:
543globals()['async'] = async_
544async_.__name__ = 'async'
545del async_
546
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400547
548def ensure_future(coro_or_future, *, loop=None):
Yury Selivanov620279b2015-10-02 15:00:19 -0400549 """Wrap a coroutine or an awaitable in a future.
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400550
551 If the argument is a Future, it is returned directly.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700552 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700553 if futures.isfuture(coro_or_future):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700554 if loop is not None and loop is not coro_or_future._loop:
555 raise ValueError('loop argument must agree with Future')
556 return coro_or_future
Victor Stinnerf951d282014-06-29 00:46:45 +0200557 elif coroutines.iscoroutine(coro_or_future):
Victor Stinner896a25a2014-07-08 11:29:25 +0200558 if loop is None:
559 loop = events.get_event_loop()
560 task = loop.create_task(coro_or_future)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200561 if task._source_traceback:
562 del task._source_traceback[-1]
563 return task
Yury Selivanov620279b2015-10-02 15:00:19 -0400564 elif compat.PY35 and inspect.isawaitable(coro_or_future):
565 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700566 else:
Yury Selivanov620279b2015-10-02 15:00:19 -0400567 raise TypeError('A Future, a coroutine or an awaitable is required')
568
569
570@coroutine
571def _wrap_awaitable(awaitable):
572 """Helper for asyncio.ensure_future().
573
574 Wraps awaitable (an object with __await__) into a coroutine
575 that will later be wrapped in a Task by ensure_future().
576 """
577 return (yield from awaitable.__await__())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700578
579
580class _GatheringFuture(futures.Future):
581 """Helper for gather().
582
583 This overrides cancel() to cancel all the children and act more
584 like Task.cancel(), which doesn't immediately mark itself as
585 cancelled.
586 """
587
588 def __init__(self, children, *, loop=None):
589 super().__init__(loop=loop)
590 self._children = children
591
592 def cancel(self):
593 if self.done():
594 return False
Yury Selivanov3d676152016-10-21 17:22:17 -0400595 ret = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700596 for child in self._children:
Yury Selivanov3d676152016-10-21 17:22:17 -0400597 if child.cancel():
598 ret = True
599 return ret
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700600
601
602def gather(*coros_or_futures, loop=None, return_exceptions=False):
603 """Return a future aggregating results from the given coroutines
604 or futures.
605
Guido van Rossume3c65a72016-09-30 08:17:15 -0700606 Coroutines will be wrapped in a future and scheduled in the event
607 loop. They will not necessarily be scheduled in the same order as
608 passed in.
609
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700610 All futures must share the same event loop. If all the tasks are
611 done successfully, the returned future's result is the list of
612 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500613 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700614 exceptions in the tasks are treated the same as successful
615 results, and gathered in the result list; otherwise, the first
616 raised exception will be immediately propagated to the returned
617 future.
618
619 Cancellation: if the outer Future is cancelled, all children (that
620 have not completed yet) are also cancelled. If any child is
621 cancelled, this is treated as if it raised CancelledError --
622 the outer Future is *not* cancelled in this case. (This is to
623 prevent the cancellation of one child to cause other children to
624 be cancelled.)
625 """
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200626 if not coros_or_futures:
Yury Selivanov7661db62016-05-16 15:38:39 -0400627 if loop is None:
628 loop = events.get_event_loop()
629 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700630 outer.set_result([])
631 return outer
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200632
633 arg_to_fut = {}
634 for arg in set(coros_or_futures):
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700635 if not futures.isfuture(arg):
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400636 fut = ensure_future(arg, loop=loop)
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200637 if loop is None:
638 loop = fut._loop
639 # The caller cannot control this future, the "destroy pending task"
640 # warning should not be emitted.
641 fut._log_destroy_pending = False
642 else:
643 fut = arg
644 if loop is None:
645 loop = fut._loop
646 elif fut._loop is not loop:
647 raise ValueError("futures are tied to different event loops")
648 arg_to_fut[arg] = fut
649
650 children = [arg_to_fut[arg] for arg in coros_or_futures]
651 nchildren = len(children)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700652 outer = _GatheringFuture(children, loop=loop)
653 nfinished = 0
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200654 results = [None] * nchildren
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700655
656 def _done_callback(i, fut):
657 nonlocal nfinished
Victor Stinner3531d902015-01-09 01:42:52 +0100658 if outer.done():
659 if not fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700660 # Mark exception retrieved.
661 fut.exception()
662 return
Victor Stinner3531d902015-01-09 01:42:52 +0100663
Victor Stinner29342622015-01-29 14:15:19 +0100664 if fut.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700665 res = futures.CancelledError()
666 if not return_exceptions:
667 outer.set_exception(res)
668 return
669 elif fut._exception is not None:
670 res = fut.exception() # Mark exception retrieved.
671 if not return_exceptions:
672 outer.set_exception(res)
673 return
674 else:
675 res = fut._result
676 results[i] = res
677 nfinished += 1
Victor Stinnerf03b3c72014-07-16 18:36:24 +0200678 if nfinished == nchildren:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700679 outer.set_result(results)
680
681 for i, fut in enumerate(children):
682 fut.add_done_callback(functools.partial(_done_callback, i))
683 return outer
684
685
686def shield(arg, *, loop=None):
687 """Wait for a future, shielding it from cancellation.
688
689 The statement
690
691 res = yield from shield(something())
692
693 is exactly equivalent to the statement
694
695 res = yield from something()
696
697 *except* that if the coroutine containing it is cancelled, the
698 task running in something() is not cancelled. From the POV of
699 something(), the cancellation did not happen. But its caller is
700 still cancelled, so the yield-from expression still raises
701 CancelledError. Note: If something() is cancelled by other means
702 this will still cancel shield().
703
704 If you want to completely ignore cancellation (not recommended)
705 you can combine shield() with a try/except clause, as follows:
706
707 try:
708 res = yield from shield(something())
709 except CancelledError:
710 res = None
711 """
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400712 inner = ensure_future(arg, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700713 if inner.done():
714 # Shortcut.
715 return inner
716 loop = inner._loop
Yury Selivanov7661db62016-05-16 15:38:39 -0400717 outer = loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700718
719 def _done_callback(inner):
720 if outer.cancelled():
Victor Stinner3531d902015-01-09 01:42:52 +0100721 if not inner.cancelled():
722 # Mark inner's result as retrieved.
723 inner.exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700724 return
Victor Stinner3531d902015-01-09 01:42:52 +0100725
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700726 if inner.cancelled():
727 outer.cancel()
728 else:
729 exc = inner.exception()
730 if exc is not None:
731 outer.set_exception(exc)
732 else:
733 outer.set_result(inner.result())
734
735 inner.add_done_callback(_done_callback)
736 return outer
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700737
738
739def run_coroutine_threadsafe(coro, loop):
740 """Submit a coroutine object to a given event loop.
741
742 Return a concurrent.futures.Future to access the result.
743 """
744 if not coroutines.iscoroutine(coro):
745 raise TypeError('A coroutine object is required')
746 future = concurrent.futures.Future()
747
748 def callback():
Guido van Rossum601953b2015-10-05 16:20:00 -0700749 try:
750 futures._chain_future(ensure_future(coro, loop=loop), future)
751 except Exception as exc:
752 if future.set_running_or_notify_cancel():
753 future.set_exception(exc)
754 raise
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700755
756 loop.call_soon_threadsafe(callback)
757 return future