blob: d06828a620441f22bce24c72ee822b33f41fdf99 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""A Future class similar to the one in PEP 3148."""
2
3__all__ = ['CancelledError', 'TimeoutError',
4 'InvalidStateError',
5 'Future', 'wrap_future',
6 ]
7
8import concurrent.futures._base
9import logging
Victor Stinner313a9802014-07-29 12:58:23 +020010import reprlib
Victor Stinner4c3c6992013-12-19 22:42:40 +010011import sys
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012import traceback
13
14from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015
16# States for Future.
17_PENDING = 'PENDING'
18_CANCELLED = 'CANCELLED'
19_FINISHED = 'FINISHED'
20
Victor Stinner4c3c6992013-12-19 22:42:40 +010021_PY34 = sys.version_info >= (3, 4)
Yury Selivanov1af2bf72015-05-11 22:27:25 -040022_PY35 = sys.version_info >= (3, 5)
Victor Stinner4c3c6992013-12-19 22:42:40 +010023
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070024Error = concurrent.futures._base.Error
25CancelledError = concurrent.futures.CancelledError
26TimeoutError = concurrent.futures.TimeoutError
27
28STACK_DEBUG = logging.DEBUG - 1 # heavy-duty debugging
29
30
31class InvalidStateError(Error):
32 """The operation is not allowed in this state."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070033
34
35class _TracebackLogger:
36 """Helper to log a traceback upon destruction if not cleared.
37
38 This solves a nasty problem with Futures and Tasks that have an
39 exception set: if nobody asks for the exception, the exception is
40 never logged. This violates the Zen of Python: 'Errors should
41 never pass silently. Unless explicitly silenced.'
42
43 However, we don't want to log the exception as soon as
44 set_exception() is called: if the calling code is written
45 properly, it will get the exception and handle it properly. But
46 we *do* want to log it if result() or exception() was never called
47 -- otherwise developers waste a lot of time wondering why their
48 buggy code fails silently.
49
50 An earlier attempt added a __del__() method to the Future class
51 itself, but this backfired because the presence of __del__()
52 prevents garbage collection from breaking cycles. A way out of
53 this catch-22 is to avoid having a __del__() method on the Future
54 class itself, but instead to have a reference to a helper object
55 with a __del__() method that logs the traceback, where we ensure
56 that the helper object doesn't participate in cycles, and only the
57 Future has a reference to it.
58
59 The helper object is added when set_exception() is called. When
60 the Future is collected, and the helper is present, the helper
61 object is also collected, and its __del__() method will log the
62 traceback. When the Future's result() or exception() method is
Serhiy Storchaka56a6d852014-12-01 18:28:43 +020063 called (and a helper object is present), it removes the helper
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070064 object, after calling its clear() method to prevent it from
65 logging.
66
67 One downside is that we do a fair amount of work to extract the
68 traceback from the exception, even when it is never logged. It
69 would seem cheaper to just store the exception object, but that
70 references the traceback, which references stack frames, which may
71 reference the Future, which references the _TracebackLogger, and
72 then the _TracebackLogger would be included in a cycle, which is
73 what we're trying to avoid! As an optimization, we don't
74 immediately format the exception; we only do the work when
75 activate() is called, which call is delayed until after all the
76 Future's callbacks have run. Since usually a Future has at least
77 one callback (typically set by 'yield from') and usually that
78 callback extracts the callback, thereby removing the need to
79 format the exception.
80
81 PS. I don't claim credit for this solution. I first heard of it
82 in a discussion about closing files when they are collected.
83 """
84
Victor Stinner80f53aa2014-06-27 13:52:20 +020085 __slots__ = ('loop', 'source_traceback', 'exc', 'tb')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086
Victor Stinner80f53aa2014-06-27 13:52:20 +020087 def __init__(self, future, exc):
88 self.loop = future._loop
89 self.source_traceback = future._source_traceback
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070090 self.exc = exc
91 self.tb = None
92
93 def activate(self):
94 exc = self.exc
95 if exc is not None:
96 self.exc = None
97 self.tb = traceback.format_exception(exc.__class__, exc,
98 exc.__traceback__)
99
100 def clear(self):
101 self.exc = None
102 self.tb = None
103
104 def __del__(self):
105 if self.tb:
Victor Stinner662fd5f2014-11-20 14:16:31 +0100106 msg = 'Future/Task exception was never retrieved\n'
Victor Stinner80f53aa2014-06-27 13:52:20 +0200107 if self.source_traceback:
Victor Stinner662fd5f2014-11-20 14:16:31 +0100108 src = ''.join(traceback.format_list(self.source_traceback))
109 msg += 'Future/Task created at (most recent call last):\n'
110 msg += '%s\n' % src.rstrip()
Victor Stinner80f53aa2014-06-27 13:52:20 +0200111 msg += ''.join(self.tb).rstrip()
112 self.loop.call_exception_handler({'message': msg})
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700113
114
115class Future:
116 """This class is *almost* compatible with concurrent.futures.Future.
117
118 Differences:
119
120 - result() and exception() do not take a timeout argument and
121 raise an exception when the future isn't done yet.
122
123 - Callbacks registered with add_done_callback() are always called
124 via the event loop's call_soon_threadsafe().
125
126 - This class is not compatible with the wait() and as_completed()
127 methods in the concurrent.futures package.
128
129 (In Python 3.4 or later we may be able to unify the implementations.)
130 """
131
132 # Class variables serving as defaults for instance variables.
133 _state = _PENDING
134 _result = None
135 _exception = None
136 _loop = None
Victor Stinnerfe22e092014-12-04 23:00:13 +0100137 _source_traceback = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700138
139 _blocking = False # proper use of future (yield vs yield from)
140
Victor Stinnere40c0782013-12-21 00:19:33 +0100141 _log_traceback = False # Used for Python 3.4 and later
142 _tb_logger = None # Used for Python 3.3 only
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700143
144 def __init__(self, *, loop=None):
145 """Initialize the future.
146
147 The optional event_loop argument allows to explicitly set the event
148 loop object used by the future. If it's not provided, the future uses
149 the default event loop.
150 """
151 if loop is None:
152 self._loop = events.get_event_loop()
153 else:
154 self._loop = loop
155 self._callbacks = []
Victor Stinner80f53aa2014-06-27 13:52:20 +0200156 if self._loop.get_debug():
157 self._source_traceback = traceback.extract_stack(sys._getframe(1))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700158
Victor Stinner975735f2014-06-25 21:41:58 +0200159 def _format_callbacks(self):
160 cb = self._callbacks
161 size = len(cb)
162 if not size:
163 cb = ''
164
165 def format_cb(callback):
Guido van Rossum0a9933e2015-05-02 18:38:24 -0700166 return events._format_callback_source(callback, ())
Victor Stinner975735f2014-06-25 21:41:58 +0200167
168 if size == 1:
169 cb = format_cb(cb[0])
170 elif size == 2:
171 cb = '{}, {}'.format(format_cb(cb[0]), format_cb(cb[1]))
172 elif size > 2:
173 cb = '{}, <{} more>, {}'.format(format_cb(cb[0]),
174 size-2,
175 format_cb(cb[-1]))
176 return 'cb=[%s]' % cb
177
Victor Stinner313a9802014-07-29 12:58:23 +0200178 def _repr_info(self):
Victor Stinner975735f2014-06-25 21:41:58 +0200179 info = [self._state.lower()]
180 if self._state == _FINISHED:
Victor Stinner313a9802014-07-29 12:58:23 +0200181 if self._exception is not None:
182 info.append('exception={!r}'.format(self._exception))
183 else:
184 # use reprlib to limit the length of the output, especially
185 # for very long strings
186 result = reprlib.repr(self._result)
187 info.append('result={}'.format(result))
Victor Stinner975735f2014-06-25 21:41:58 +0200188 if self._callbacks:
189 info.append(self._format_callbacks())
Victor Stinner313a9802014-07-29 12:58:23 +0200190 if self._source_traceback:
191 frame = self._source_traceback[-1]
192 info.append('created at %s:%s' % (frame[0], frame[1]))
193 return info
194
195 def __repr__(self):
196 info = self._repr_info()
Victor Stinner975735f2014-06-25 21:41:58 +0200197 return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700198
Victor Stinner978a9af2015-01-29 17:50:58 +0100199 # On Python 3.3 and older, objects with a destructor part of a reference
200 # cycle are never destroyed. It's not more the case on Python 3.4 thanks
201 # to the PEP 442.
Victor Stinner4c3c6992013-12-19 22:42:40 +0100202 if _PY34:
203 def __del__(self):
Victor Stinnere40c0782013-12-21 00:19:33 +0100204 if not self._log_traceback:
205 # set_exception() was not called, or result() or exception()
206 # has consumed the exception
207 return
208 exc = self._exception
Yury Selivanov569efa22014-02-18 18:02:19 -0500209 context = {
Victor Stinner80f53aa2014-06-27 13:52:20 +0200210 'message': ('%s exception was never retrieved'
211 % self.__class__.__name__),
Yury Selivanov569efa22014-02-18 18:02:19 -0500212 'exception': exc,
213 'future': self,
214 }
Victor Stinner80f53aa2014-06-27 13:52:20 +0200215 if self._source_traceback:
216 context['source_traceback'] = self._source_traceback
Yury Selivanov569efa22014-02-18 18:02:19 -0500217 self._loop.call_exception_handler(context)
Victor Stinner4c3c6992013-12-19 22:42:40 +0100218
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700219 def cancel(self):
220 """Cancel the future and schedule callbacks.
221
222 If the future is already done or cancelled, return False. Otherwise,
223 change the future's state to cancelled, schedule the callbacks and
224 return True.
225 """
226 if self._state != _PENDING:
227 return False
228 self._state = _CANCELLED
229 self._schedule_callbacks()
230 return True
231
232 def _schedule_callbacks(self):
233 """Internal: Ask the event loop to call all callbacks.
234
235 The callbacks are scheduled to be called as soon as possible. Also
236 clears the callback list.
237 """
238 callbacks = self._callbacks[:]
239 if not callbacks:
240 return
241
242 self._callbacks[:] = []
243 for callback in callbacks:
244 self._loop.call_soon(callback, self)
245
246 def cancelled(self):
247 """Return True if the future was cancelled."""
248 return self._state == _CANCELLED
249
250 # Don't implement running(); see http://bugs.python.org/issue18699
251
252 def done(self):
253 """Return True if the future is done.
254
255 Done means either that a result / exception are available, or that the
256 future was cancelled.
257 """
258 return self._state != _PENDING
259
260 def result(self):
261 """Return the result this future represents.
262
263 If the future has been cancelled, raises CancelledError. If the
264 future's result isn't yet available, raises InvalidStateError. If
265 the future is done and has an exception set, this exception is raised.
266 """
267 if self._state == _CANCELLED:
268 raise CancelledError
269 if self._state != _FINISHED:
270 raise InvalidStateError('Result is not ready.')
Victor Stinnere40c0782013-12-21 00:19:33 +0100271 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700272 if self._tb_logger is not None:
273 self._tb_logger.clear()
Victor Stinnere40c0782013-12-21 00:19:33 +0100274 self._tb_logger = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700275 if self._exception is not None:
276 raise self._exception
277 return self._result
278
279 def exception(self):
280 """Return the exception that was set on this future.
281
282 The exception (or None if no exception was set) is returned only if
283 the future is done. If the future has been cancelled, raises
284 CancelledError. If the future isn't done yet, raises
285 InvalidStateError.
286 """
287 if self._state == _CANCELLED:
288 raise CancelledError
289 if self._state != _FINISHED:
290 raise InvalidStateError('Exception is not set.')
Victor Stinnere40c0782013-12-21 00:19:33 +0100291 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700292 if self._tb_logger is not None:
293 self._tb_logger.clear()
Victor Stinnere40c0782013-12-21 00:19:33 +0100294 self._tb_logger = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700295 return self._exception
296
297 def add_done_callback(self, fn):
298 """Add a callback to be run when the future becomes done.
299
300 The callback is called with a single argument - the future object. If
301 the future is already done when this is called, the callback is
302 scheduled with call_soon.
303 """
304 if self._state != _PENDING:
305 self._loop.call_soon(fn, self)
306 else:
307 self._callbacks.append(fn)
308
309 # New method not in PEP 3148.
310
311 def remove_done_callback(self, fn):
312 """Remove all instances of a callback from the "call when done" list.
313
314 Returns the number of callbacks removed.
315 """
316 filtered_callbacks = [f for f in self._callbacks if f != fn]
317 removed_count = len(self._callbacks) - len(filtered_callbacks)
318 if removed_count:
319 self._callbacks[:] = filtered_callbacks
320 return removed_count
321
322 # So-called internal methods (note: no set_running_or_notify_cancel()).
323
Victor Stinnera9acbe82014-07-05 15:29:41 +0200324 def _set_result_unless_cancelled(self, result):
325 """Helper setting the result only if the future was not cancelled."""
326 if self.cancelled():
327 return
328 self.set_result(result)
329
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700330 def set_result(self, result):
331 """Mark the future done and set its result.
332
333 If the future is already done when this method is called, raises
334 InvalidStateError.
335 """
336 if self._state != _PENDING:
337 raise InvalidStateError('{}: {!r}'.format(self._state, self))
338 self._result = result
339 self._state = _FINISHED
340 self._schedule_callbacks()
341
342 def set_exception(self, exception):
343 """Mark the future done and set an exception.
344
345 If the future is already done when this method is called, raises
346 InvalidStateError.
347 """
348 if self._state != _PENDING:
349 raise InvalidStateError('{}: {!r}'.format(self._state, self))
Victor Stinner95728982014-01-30 16:01:54 -0800350 if isinstance(exception, type):
351 exception = exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700352 self._exception = exception
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700353 self._state = _FINISHED
354 self._schedule_callbacks()
Victor Stinner4c3c6992013-12-19 22:42:40 +0100355 if _PY34:
Victor Stinnere40c0782013-12-21 00:19:33 +0100356 self._log_traceback = True
Victor Stinner4c3c6992013-12-19 22:42:40 +0100357 else:
Victor Stinner80f53aa2014-06-27 13:52:20 +0200358 self._tb_logger = _TracebackLogger(self, exception)
Victor Stinner4c3c6992013-12-19 22:42:40 +0100359 # Arrange for the logger to be activated after all callbacks
360 # have had a chance to call result() or exception().
361 self._loop.call_soon(self._tb_logger.activate)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700362
363 # Truly internal methods.
364
365 def _copy_state(self, other):
366 """Internal helper to copy state from another Future.
367
368 The other Future may be a concurrent.futures.Future.
369 """
370 assert other.done()
Guido van Rossum7a465642013-11-22 11:47:22 -0800371 if self.cancelled():
372 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700373 assert not self.done()
374 if other.cancelled():
375 self.cancel()
376 else:
377 exception = other.exception()
378 if exception is not None:
379 self.set_exception(exception)
380 else:
381 result = other.result()
382 self.set_result(result)
383
384 def __iter__(self):
385 if not self.done():
386 self._blocking = True
387 yield self # This tells Task to wait for completion.
388 assert self.done(), "yield from wasn't used with future"
389 return self.result() # May raise too.
390
Yury Selivanov1af2bf72015-05-11 22:27:25 -0400391 if _PY35:
392 __await__ = __iter__ # make compatible with 'await' expression
393
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700394
395def wrap_future(fut, *, loop=None):
396 """Wrap concurrent.futures.Future object."""
397 if isinstance(fut, Future):
398 return fut
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700399 assert isinstance(fut, concurrent.futures.Future), \
400 'concurrent.futures.Future is expected, got {!r}'.format(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700401 if loop is None:
402 loop = events.get_event_loop()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700403 new_future = Future(loop=loop)
Guido van Rossum7a465642013-11-22 11:47:22 -0800404
405 def _check_cancel_other(f):
406 if f.cancelled():
407 fut.cancel()
408
409 new_future.add_done_callback(_check_cancel_other)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700410 fut.add_done_callback(
411 lambda future: loop.call_soon_threadsafe(
Victor Stinner587feb12015-01-09 21:34:27 +0100412 new_future._copy_state, future))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700413 return new_future