blob: 73215f50efe95d82d9dd6e1222bbf19ce63ceb30 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""A Future class similar to the one in PEP 3148."""
2
3__all__ = ['CancelledError', 'TimeoutError',
4 'InvalidStateError',
5 'Future', 'wrap_future',
6 ]
7
8import concurrent.futures._base
9import logging
Victor Stinner313a9802014-07-29 12:58:23 +020010import reprlib
Victor Stinner4c3c6992013-12-19 22:42:40 +010011import sys
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012import traceback
13
Victor Stinner71080fc2015-07-25 02:23:21 +020014from . import compat
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016
17# States for Future.
18_PENDING = 'PENDING'
19_CANCELLED = 'CANCELLED'
20_FINISHED = 'FINISHED'
21
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022Error = concurrent.futures._base.Error
23CancelledError = concurrent.futures.CancelledError
24TimeoutError = concurrent.futures.TimeoutError
25
26STACK_DEBUG = logging.DEBUG - 1 # heavy-duty debugging
27
28
29class InvalidStateError(Error):
30 """The operation is not allowed in this state."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031
32
33class _TracebackLogger:
34 """Helper to log a traceback upon destruction if not cleared.
35
36 This solves a nasty problem with Futures and Tasks that have an
37 exception set: if nobody asks for the exception, the exception is
38 never logged. This violates the Zen of Python: 'Errors should
39 never pass silently. Unless explicitly silenced.'
40
41 However, we don't want to log the exception as soon as
42 set_exception() is called: if the calling code is written
43 properly, it will get the exception and handle it properly. But
44 we *do* want to log it if result() or exception() was never called
45 -- otherwise developers waste a lot of time wondering why their
46 buggy code fails silently.
47
48 An earlier attempt added a __del__() method to the Future class
49 itself, but this backfired because the presence of __del__()
50 prevents garbage collection from breaking cycles. A way out of
51 this catch-22 is to avoid having a __del__() method on the Future
52 class itself, but instead to have a reference to a helper object
53 with a __del__() method that logs the traceback, where we ensure
54 that the helper object doesn't participate in cycles, and only the
55 Future has a reference to it.
56
57 The helper object is added when set_exception() is called. When
58 the Future is collected, and the helper is present, the helper
59 object is also collected, and its __del__() method will log the
60 traceback. When the Future's result() or exception() method is
Serhiy Storchaka56a6d852014-12-01 18:28:43 +020061 called (and a helper object is present), it removes the helper
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070062 object, after calling its clear() method to prevent it from
63 logging.
64
65 One downside is that we do a fair amount of work to extract the
66 traceback from the exception, even when it is never logged. It
67 would seem cheaper to just store the exception object, but that
68 references the traceback, which references stack frames, which may
69 reference the Future, which references the _TracebackLogger, and
70 then the _TracebackLogger would be included in a cycle, which is
71 what we're trying to avoid! As an optimization, we don't
72 immediately format the exception; we only do the work when
73 activate() is called, which call is delayed until after all the
74 Future's callbacks have run. Since usually a Future has at least
75 one callback (typically set by 'yield from') and usually that
76 callback extracts the callback, thereby removing the need to
77 format the exception.
78
79 PS. I don't claim credit for this solution. I first heard of it
80 in a discussion about closing files when they are collected.
81 """
82
Victor Stinner80f53aa2014-06-27 13:52:20 +020083 __slots__ = ('loop', 'source_traceback', 'exc', 'tb')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070084
Victor Stinner80f53aa2014-06-27 13:52:20 +020085 def __init__(self, future, exc):
86 self.loop = future._loop
87 self.source_traceback = future._source_traceback
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070088 self.exc = exc
89 self.tb = None
90
91 def activate(self):
92 exc = self.exc
93 if exc is not None:
94 self.exc = None
95 self.tb = traceback.format_exception(exc.__class__, exc,
96 exc.__traceback__)
97
98 def clear(self):
99 self.exc = None
100 self.tb = None
101
102 def __del__(self):
103 if self.tb:
Victor Stinner662fd5f2014-11-20 14:16:31 +0100104 msg = 'Future/Task exception was never retrieved\n'
Victor Stinner80f53aa2014-06-27 13:52:20 +0200105 if self.source_traceback:
Victor Stinner662fd5f2014-11-20 14:16:31 +0100106 src = ''.join(traceback.format_list(self.source_traceback))
107 msg += 'Future/Task created at (most recent call last):\n'
108 msg += '%s\n' % src.rstrip()
Victor Stinner80f53aa2014-06-27 13:52:20 +0200109 msg += ''.join(self.tb).rstrip()
110 self.loop.call_exception_handler({'message': msg})
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700111
112
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700113def isfuture(obj):
114 """Check for a Future.
115
116 This returns True when obj is a Future instance or is advertising
117 itself as duck-type compatible by setting _asyncio_future_blocking.
118 See comment in Future for more details.
119 """
120 return getattr(obj, '_asyncio_future_blocking', None) is not None
121
122
INADA Naoki9e4e38e2016-10-09 14:44:47 +0900123def _format_callbacks(cb):
124 """helper function for Future.__repr__"""
125 size = len(cb)
126 if not size:
127 cb = ''
128
129 def format_cb(callback):
130 return events._format_callback_source(callback, ())
131
132 if size == 1:
133 cb = format_cb(cb[0])
134 elif size == 2:
135 cb = '{}, {}'.format(format_cb(cb[0]), format_cb(cb[1]))
136 elif size > 2:
137 cb = '{}, <{} more>, {}'.format(format_cb(cb[0]),
138 size-2,
139 format_cb(cb[-1]))
140 return 'cb=[%s]' % cb
141
142
143def _future_repr_info(future):
144 # (Future) -> str
145 """helper function for Future.__repr__"""
146 info = [future._state.lower()]
147 if future._state == _FINISHED:
148 if future._exception is not None:
149 info.append('exception={!r}'.format(future._exception))
150 else:
151 # use reprlib to limit the length of the output, especially
152 # for very long strings
153 result = reprlib.repr(future._result)
154 info.append('result={}'.format(result))
155 if future._callbacks:
156 info.append(_format_callbacks(future._callbacks))
157 if future._source_traceback:
158 frame = future._source_traceback[-1]
159 info.append('created at %s:%s' % (frame[0], frame[1]))
160 return info
161
162
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700163class Future:
164 """This class is *almost* compatible with concurrent.futures.Future.
165
166 Differences:
167
168 - result() and exception() do not take a timeout argument and
169 raise an exception when the future isn't done yet.
170
171 - Callbacks registered with add_done_callback() are always called
172 via the event loop's call_soon_threadsafe().
173
174 - This class is not compatible with the wait() and as_completed()
175 methods in the concurrent.futures package.
176
177 (In Python 3.4 or later we may be able to unify the implementations.)
178 """
179
180 # Class variables serving as defaults for instance variables.
181 _state = _PENDING
182 _result = None
183 _exception = None
184 _loop = None
Victor Stinnerfe22e092014-12-04 23:00:13 +0100185 _source_traceback = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700186
Guido van Rossum1140a032016-09-09 12:54:54 -0700187 # This field is used for a dual purpose:
188 # - Its presence is a marker to declare that a class implements
189 # the Future protocol (i.e. is intended to be duck-type compatible).
190 # The value must also be not-None, to enable a subclass to declare
191 # that it is not compatible by setting this to None.
192 # - It is set by __iter__() below so that Task._step() can tell
193 # the difference between `yield from Future()` (correct) vs.
194 # `yield Future()` (incorrect).
195 _asyncio_future_blocking = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700196
Victor Stinnere40c0782013-12-21 00:19:33 +0100197 _log_traceback = False # Used for Python 3.4 and later
198 _tb_logger = None # Used for Python 3.3 only
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700199
200 def __init__(self, *, loop=None):
201 """Initialize the future.
202
Martin Panterc04fb562016-02-10 05:44:01 +0000203 The optional event_loop argument allows explicitly setting the event
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700204 loop object used by the future. If it's not provided, the future uses
205 the default event loop.
206 """
207 if loop is None:
208 self._loop = events.get_event_loop()
209 else:
210 self._loop = loop
211 self._callbacks = []
Victor Stinner80f53aa2014-06-27 13:52:20 +0200212 if self._loop.get_debug():
213 self._source_traceback = traceback.extract_stack(sys._getframe(1))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700214
INADA Naoki9e4e38e2016-10-09 14:44:47 +0900215 _repr_info = _future_repr_info
Victor Stinner313a9802014-07-29 12:58:23 +0200216
217 def __repr__(self):
INADA Naoki9e4e38e2016-10-09 14:44:47 +0900218 return '<%s %s>' % (self.__class__.__name__, ' '.join(self._repr_info()))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700219
Victor Stinner978a9af2015-01-29 17:50:58 +0100220 # On Python 3.3 and older, objects with a destructor part of a reference
221 # cycle are never destroyed. It's not more the case on Python 3.4 thanks
222 # to the PEP 442.
Victor Stinner71080fc2015-07-25 02:23:21 +0200223 if compat.PY34:
Victor Stinner4c3c6992013-12-19 22:42:40 +0100224 def __del__(self):
Victor Stinnere40c0782013-12-21 00:19:33 +0100225 if not self._log_traceback:
226 # set_exception() was not called, or result() or exception()
227 # has consumed the exception
228 return
229 exc = self._exception
Yury Selivanov569efa22014-02-18 18:02:19 -0500230 context = {
Victor Stinner80f53aa2014-06-27 13:52:20 +0200231 'message': ('%s exception was never retrieved'
232 % self.__class__.__name__),
Yury Selivanov569efa22014-02-18 18:02:19 -0500233 'exception': exc,
234 'future': self,
235 }
Victor Stinner80f53aa2014-06-27 13:52:20 +0200236 if self._source_traceback:
237 context['source_traceback'] = self._source_traceback
Yury Selivanov569efa22014-02-18 18:02:19 -0500238 self._loop.call_exception_handler(context)
Victor Stinner4c3c6992013-12-19 22:42:40 +0100239
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700240 def cancel(self):
241 """Cancel the future and schedule callbacks.
242
243 If the future is already done or cancelled, return False. Otherwise,
244 change the future's state to cancelled, schedule the callbacks and
245 return True.
246 """
247 if self._state != _PENDING:
248 return False
249 self._state = _CANCELLED
250 self._schedule_callbacks()
251 return True
252
253 def _schedule_callbacks(self):
254 """Internal: Ask the event loop to call all callbacks.
255
256 The callbacks are scheduled to be called as soon as possible. Also
257 clears the callback list.
258 """
259 callbacks = self._callbacks[:]
260 if not callbacks:
261 return
262
263 self._callbacks[:] = []
264 for callback in callbacks:
265 self._loop.call_soon(callback, self)
266
267 def cancelled(self):
268 """Return True if the future was cancelled."""
269 return self._state == _CANCELLED
270
271 # Don't implement running(); see http://bugs.python.org/issue18699
272
273 def done(self):
274 """Return True if the future is done.
275
276 Done means either that a result / exception are available, or that the
277 future was cancelled.
278 """
279 return self._state != _PENDING
280
281 def result(self):
282 """Return the result this future represents.
283
284 If the future has been cancelled, raises CancelledError. If the
285 future's result isn't yet available, raises InvalidStateError. If
286 the future is done and has an exception set, this exception is raised.
287 """
288 if self._state == _CANCELLED:
289 raise CancelledError
290 if self._state != _FINISHED:
291 raise InvalidStateError('Result is not ready.')
Victor Stinnere40c0782013-12-21 00:19:33 +0100292 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700293 if self._tb_logger is not None:
294 self._tb_logger.clear()
Victor Stinnere40c0782013-12-21 00:19:33 +0100295 self._tb_logger = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700296 if self._exception is not None:
297 raise self._exception
298 return self._result
299
300 def exception(self):
301 """Return the exception that was set on this future.
302
303 The exception (or None if no exception was set) is returned only if
304 the future is done. If the future has been cancelled, raises
305 CancelledError. If the future isn't done yet, raises
306 InvalidStateError.
307 """
308 if self._state == _CANCELLED:
309 raise CancelledError
310 if self._state != _FINISHED:
311 raise InvalidStateError('Exception is not set.')
Victor Stinnere40c0782013-12-21 00:19:33 +0100312 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700313 if self._tb_logger is not None:
314 self._tb_logger.clear()
Victor Stinnere40c0782013-12-21 00:19:33 +0100315 self._tb_logger = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700316 return self._exception
317
318 def add_done_callback(self, fn):
319 """Add a callback to be run when the future becomes done.
320
321 The callback is called with a single argument - the future object. If
322 the future is already done when this is called, the callback is
323 scheduled with call_soon.
324 """
325 if self._state != _PENDING:
326 self._loop.call_soon(fn, self)
327 else:
328 self._callbacks.append(fn)
329
330 # New method not in PEP 3148.
331
332 def remove_done_callback(self, fn):
333 """Remove all instances of a callback from the "call when done" list.
334
335 Returns the number of callbacks removed.
336 """
337 filtered_callbacks = [f for f in self._callbacks if f != fn]
338 removed_count = len(self._callbacks) - len(filtered_callbacks)
339 if removed_count:
340 self._callbacks[:] = filtered_callbacks
341 return removed_count
342
343 # So-called internal methods (note: no set_running_or_notify_cancel()).
344
345 def set_result(self, result):
346 """Mark the future done and set its result.
347
348 If the future is already done when this method is called, raises
349 InvalidStateError.
350 """
351 if self._state != _PENDING:
352 raise InvalidStateError('{}: {!r}'.format(self._state, self))
353 self._result = result
354 self._state = _FINISHED
355 self._schedule_callbacks()
356
357 def set_exception(self, exception):
358 """Mark the future done and set an exception.
359
360 If the future is already done when this method is called, raises
361 InvalidStateError.
362 """
363 if self._state != _PENDING:
364 raise InvalidStateError('{}: {!r}'.format(self._state, self))
Victor Stinner95728982014-01-30 16:01:54 -0800365 if isinstance(exception, type):
366 exception = exception()
Yury Selivanov1bd03072016-03-02 11:03:28 -0500367 if type(exception) is StopIteration:
368 raise TypeError("StopIteration interacts badly with generators "
369 "and cannot be raised into a Future")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700370 self._exception = exception
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700371 self._state = _FINISHED
372 self._schedule_callbacks()
Victor Stinner71080fc2015-07-25 02:23:21 +0200373 if compat.PY34:
Victor Stinnere40c0782013-12-21 00:19:33 +0100374 self._log_traceback = True
Victor Stinner4c3c6992013-12-19 22:42:40 +0100375 else:
Victor Stinner80f53aa2014-06-27 13:52:20 +0200376 self._tb_logger = _TracebackLogger(self, exception)
Victor Stinner4c3c6992013-12-19 22:42:40 +0100377 # Arrange for the logger to be activated after all callbacks
378 # have had a chance to call result() or exception().
379 self._loop.call_soon(self._tb_logger.activate)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700380
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700381 def __iter__(self):
382 if not self.done():
Guido van Rossum1140a032016-09-09 12:54:54 -0700383 self._asyncio_future_blocking = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700384 yield self # This tells Task to wait for completion.
385 assert self.done(), "yield from wasn't used with future"
386 return self.result() # May raise too.
387
Victor Stinner71080fc2015-07-25 02:23:21 +0200388 if compat.PY35:
Yury Selivanov1af2bf72015-05-11 22:27:25 -0400389 __await__ = __iter__ # make compatible with 'await' expression
390
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700391
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500392def _set_result_unless_cancelled(fut, result):
393 """Helper setting the result only if the future was not cancelled."""
394 if fut.cancelled():
395 return
396 fut.set_result(result)
397
398
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700399def _set_concurrent_future_state(concurrent, source):
400 """Copy state from a future to a concurrent.futures.Future."""
401 assert source.done()
402 if source.cancelled():
403 concurrent.cancel()
404 if not concurrent.set_running_or_notify_cancel():
405 return
406 exception = source.exception()
407 if exception is not None:
408 concurrent.set_exception(exception)
409 else:
410 result = source.result()
411 concurrent.set_result(result)
412
413
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500414def _copy_future_state(source, dest):
415 """Internal helper to copy state from another Future.
416
417 The other Future may be a concurrent.futures.Future.
418 """
419 assert source.done()
420 if dest.cancelled():
421 return
422 assert not dest.done()
423 if source.cancelled():
424 dest.cancel()
425 else:
426 exception = source.exception()
427 if exception is not None:
428 dest.set_exception(exception)
429 else:
430 result = source.result()
431 dest.set_result(result)
432
433
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700434def _chain_future(source, destination):
435 """Chain two futures so that when one completes, so does the other.
436
437 The result (or exception) of source will be copied to destination.
438 If destination is cancelled, source gets cancelled too.
439 Compatible with both asyncio.Future and concurrent.futures.Future.
440 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700441 if not isfuture(source) and not isinstance(source,
442 concurrent.futures.Future):
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700443 raise TypeError('A future is required for source argument')
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700444 if not isfuture(destination) and not isinstance(destination,
445 concurrent.futures.Future):
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700446 raise TypeError('A future is required for destination argument')
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700447 source_loop = source._loop if isfuture(source) else None
448 dest_loop = destination._loop if isfuture(destination) else None
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700449
450 def _set_state(future, other):
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700451 if isfuture(future):
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500452 _copy_future_state(other, future)
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700453 else:
454 _set_concurrent_future_state(future, other)
455
456 def _call_check_cancel(destination):
457 if destination.cancelled():
458 if source_loop is None or source_loop is dest_loop:
459 source.cancel()
460 else:
461 source_loop.call_soon_threadsafe(source.cancel)
462
463 def _call_set_state(source):
464 if dest_loop is None or dest_loop is source_loop:
465 _set_state(destination, source)
466 else:
467 dest_loop.call_soon_threadsafe(_set_state, destination, source)
468
469 destination.add_done_callback(_call_check_cancel)
470 source.add_done_callback(_call_set_state)
471
472
473def wrap_future(future, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700474 """Wrap concurrent.futures.Future object."""
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700475 if isfuture(future):
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700476 return future
477 assert isinstance(future, concurrent.futures.Future), \
478 'concurrent.futures.Future is expected, got {!r}'.format(future)
Yury Selivanov7661db62016-05-16 15:38:39 -0400479 if loop is None:
480 loop = events.get_event_loop()
481 new_future = loop.create_future()
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700482 _chain_future(future, new_future)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700483 return new_future
INADA Naokic411a7d2016-10-18 11:48:14 +0900484
485
486try:
487 import _asyncio
488except ImportError:
489 pass
490else:
491 Future = _asyncio.Future