blob: 7998fbbcfbf21595238a4df6fa34e80faf58c6d0 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""A Future class similar to the one in PEP 3148."""
2
3__all__ = ['CancelledError', 'TimeoutError',
4 'InvalidStateError',
5 'Future', 'wrap_future',
6 ]
7
8import concurrent.futures._base
9import logging
Victor Stinner313a9802014-07-29 12:58:23 +020010import reprlib
Victor Stinner4c3c6992013-12-19 22:42:40 +010011import sys
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012import traceback
13
14from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015
16# States for Future.
17_PENDING = 'PENDING'
18_CANCELLED = 'CANCELLED'
19_FINISHED = 'FINISHED'
20
Victor Stinner4c3c6992013-12-19 22:42:40 +010021_PY34 = sys.version_info >= (3, 4)
22
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023# TODO: Do we really want to depend on concurrent.futures internals?
24Error = concurrent.futures._base.Error
25CancelledError = concurrent.futures.CancelledError
26TimeoutError = concurrent.futures.TimeoutError
27
28STACK_DEBUG = logging.DEBUG - 1 # heavy-duty debugging
29
30
31class InvalidStateError(Error):
32 """The operation is not allowed in this state."""
33 # TODO: Show the future, its state, the method, and the required state.
34
35
36class _TracebackLogger:
37 """Helper to log a traceback upon destruction if not cleared.
38
39 This solves a nasty problem with Futures and Tasks that have an
40 exception set: if nobody asks for the exception, the exception is
41 never logged. This violates the Zen of Python: 'Errors should
42 never pass silently. Unless explicitly silenced.'
43
44 However, we don't want to log the exception as soon as
45 set_exception() is called: if the calling code is written
46 properly, it will get the exception and handle it properly. But
47 we *do* want to log it if result() or exception() was never called
48 -- otherwise developers waste a lot of time wondering why their
49 buggy code fails silently.
50
51 An earlier attempt added a __del__() method to the Future class
52 itself, but this backfired because the presence of __del__()
53 prevents garbage collection from breaking cycles. A way out of
54 this catch-22 is to avoid having a __del__() method on the Future
55 class itself, but instead to have a reference to a helper object
56 with a __del__() method that logs the traceback, where we ensure
57 that the helper object doesn't participate in cycles, and only the
58 Future has a reference to it.
59
60 The helper object is added when set_exception() is called. When
61 the Future is collected, and the helper is present, the helper
62 object is also collected, and its __del__() method will log the
63 traceback. When the Future's result() or exception() method is
64 called (and a helper object is present), it removes the the helper
65 object, after calling its clear() method to prevent it from
66 logging.
67
68 One downside is that we do a fair amount of work to extract the
69 traceback from the exception, even when it is never logged. It
70 would seem cheaper to just store the exception object, but that
71 references the traceback, which references stack frames, which may
72 reference the Future, which references the _TracebackLogger, and
73 then the _TracebackLogger would be included in a cycle, which is
74 what we're trying to avoid! As an optimization, we don't
75 immediately format the exception; we only do the work when
76 activate() is called, which call is delayed until after all the
77 Future's callbacks have run. Since usually a Future has at least
78 one callback (typically set by 'yield from') and usually that
79 callback extracts the callback, thereby removing the need to
80 format the exception.
81
82 PS. I don't claim credit for this solution. I first heard of it
83 in a discussion about closing files when they are collected.
84 """
85
Victor Stinner80f53aa2014-06-27 13:52:20 +020086 __slots__ = ('loop', 'source_traceback', 'exc', 'tb')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070087
Victor Stinner80f53aa2014-06-27 13:52:20 +020088 def __init__(self, future, exc):
89 self.loop = future._loop
90 self.source_traceback = future._source_traceback
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070091 self.exc = exc
92 self.tb = None
93
94 def activate(self):
95 exc = self.exc
96 if exc is not None:
97 self.exc = None
98 self.tb = traceback.format_exception(exc.__class__, exc,
99 exc.__traceback__)
100
101 def clear(self):
102 self.exc = None
103 self.tb = None
104
105 def __del__(self):
106 if self.tb:
Victor Stinner80f53aa2014-06-27 13:52:20 +0200107 msg = 'Future/Task exception was never retrieved'
108 if self.source_traceback:
109 msg += '\nFuture/Task created at (most recent call last):\n'
110 msg += ''.join(traceback.format_list(self.source_traceback))
111 msg += ''.join(self.tb).rstrip()
112 self.loop.call_exception_handler({'message': msg})
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700113
114
115class Future:
116 """This class is *almost* compatible with concurrent.futures.Future.
117
118 Differences:
119
120 - result() and exception() do not take a timeout argument and
121 raise an exception when the future isn't done yet.
122
123 - Callbacks registered with add_done_callback() are always called
124 via the event loop's call_soon_threadsafe().
125
126 - This class is not compatible with the wait() and as_completed()
127 methods in the concurrent.futures package.
128
129 (In Python 3.4 or later we may be able to unify the implementations.)
130 """
131
132 # Class variables serving as defaults for instance variables.
133 _state = _PENDING
134 _result = None
135 _exception = None
136 _loop = None
137
138 _blocking = False # proper use of future (yield vs yield from)
139
Victor Stinnere40c0782013-12-21 00:19:33 +0100140 _log_traceback = False # Used for Python 3.4 and later
141 _tb_logger = None # Used for Python 3.3 only
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700142
143 def __init__(self, *, loop=None):
144 """Initialize the future.
145
146 The optional event_loop argument allows to explicitly set the event
147 loop object used by the future. If it's not provided, the future uses
148 the default event loop.
149 """
150 if loop is None:
151 self._loop = events.get_event_loop()
152 else:
153 self._loop = loop
154 self._callbacks = []
Victor Stinner80f53aa2014-06-27 13:52:20 +0200155 if self._loop.get_debug():
156 self._source_traceback = traceback.extract_stack(sys._getframe(1))
157 else:
158 self._source_traceback = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700159
Victor Stinner975735f2014-06-25 21:41:58 +0200160 def _format_callbacks(self):
161 cb = self._callbacks
162 size = len(cb)
163 if not size:
164 cb = ''
165
166 def format_cb(callback):
167 return events._format_callback(callback, ())
168
169 if size == 1:
170 cb = format_cb(cb[0])
171 elif size == 2:
172 cb = '{}, {}'.format(format_cb(cb[0]), format_cb(cb[1]))
173 elif size > 2:
174 cb = '{}, <{} more>, {}'.format(format_cb(cb[0]),
175 size-2,
176 format_cb(cb[-1]))
177 return 'cb=[%s]' % cb
178
Victor Stinner313a9802014-07-29 12:58:23 +0200179 def _repr_info(self):
Victor Stinner975735f2014-06-25 21:41:58 +0200180 info = [self._state.lower()]
181 if self._state == _FINISHED:
Victor Stinner313a9802014-07-29 12:58:23 +0200182 if self._exception is not None:
183 info.append('exception={!r}'.format(self._exception))
184 else:
185 # use reprlib to limit the length of the output, especially
186 # for very long strings
187 result = reprlib.repr(self._result)
188 info.append('result={}'.format(result))
Victor Stinner975735f2014-06-25 21:41:58 +0200189 if self._callbacks:
190 info.append(self._format_callbacks())
Victor Stinner313a9802014-07-29 12:58:23 +0200191 if self._source_traceback:
192 frame = self._source_traceback[-1]
193 info.append('created at %s:%s' % (frame[0], frame[1]))
194 return info
195
196 def __repr__(self):
197 info = self._repr_info()
Victor Stinner975735f2014-06-25 21:41:58 +0200198 return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700199
Victor Stinnera02f81f2014-06-24 22:37:53 +0200200 # On Python 3.3 or older, objects with a destructor part of a reference
201 # cycle are never destroyed. It's not more the case on Python 3.4 thanks to
202 # the PEP 442.
Victor Stinner4c3c6992013-12-19 22:42:40 +0100203 if _PY34:
204 def __del__(self):
Victor Stinnere40c0782013-12-21 00:19:33 +0100205 if not self._log_traceback:
206 # set_exception() was not called, or result() or exception()
207 # has consumed the exception
208 return
209 exc = self._exception
Yury Selivanov569efa22014-02-18 18:02:19 -0500210 context = {
Victor Stinner80f53aa2014-06-27 13:52:20 +0200211 'message': ('%s exception was never retrieved'
212 % self.__class__.__name__),
Yury Selivanov569efa22014-02-18 18:02:19 -0500213 'exception': exc,
214 'future': self,
215 }
Victor Stinner80f53aa2014-06-27 13:52:20 +0200216 if self._source_traceback:
217 context['source_traceback'] = self._source_traceback
Yury Selivanov569efa22014-02-18 18:02:19 -0500218 self._loop.call_exception_handler(context)
Victor Stinner4c3c6992013-12-19 22:42:40 +0100219
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700220 def cancel(self):
221 """Cancel the future and schedule callbacks.
222
223 If the future is already done or cancelled, return False. Otherwise,
224 change the future's state to cancelled, schedule the callbacks and
225 return True.
226 """
227 if self._state != _PENDING:
228 return False
229 self._state = _CANCELLED
230 self._schedule_callbacks()
231 return True
232
233 def _schedule_callbacks(self):
234 """Internal: Ask the event loop to call all callbacks.
235
236 The callbacks are scheduled to be called as soon as possible. Also
237 clears the callback list.
238 """
239 callbacks = self._callbacks[:]
240 if not callbacks:
241 return
242
243 self._callbacks[:] = []
244 for callback in callbacks:
245 self._loop.call_soon(callback, self)
246
247 def cancelled(self):
248 """Return True if the future was cancelled."""
249 return self._state == _CANCELLED
250
251 # Don't implement running(); see http://bugs.python.org/issue18699
252
253 def done(self):
254 """Return True if the future is done.
255
256 Done means either that a result / exception are available, or that the
257 future was cancelled.
258 """
259 return self._state != _PENDING
260
261 def result(self):
262 """Return the result this future represents.
263
264 If the future has been cancelled, raises CancelledError. If the
265 future's result isn't yet available, raises InvalidStateError. If
266 the future is done and has an exception set, this exception is raised.
267 """
268 if self._state == _CANCELLED:
269 raise CancelledError
270 if self._state != _FINISHED:
271 raise InvalidStateError('Result is not ready.')
Victor Stinnere40c0782013-12-21 00:19:33 +0100272 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700273 if self._tb_logger is not None:
274 self._tb_logger.clear()
Victor Stinnere40c0782013-12-21 00:19:33 +0100275 self._tb_logger = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700276 if self._exception is not None:
277 raise self._exception
278 return self._result
279
280 def exception(self):
281 """Return the exception that was set on this future.
282
283 The exception (or None if no exception was set) is returned only if
284 the future is done. If the future has been cancelled, raises
285 CancelledError. If the future isn't done yet, raises
286 InvalidStateError.
287 """
288 if self._state == _CANCELLED:
289 raise CancelledError
290 if self._state != _FINISHED:
291 raise InvalidStateError('Exception is not set.')
Victor Stinnere40c0782013-12-21 00:19:33 +0100292 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700293 if self._tb_logger is not None:
294 self._tb_logger.clear()
Victor Stinnere40c0782013-12-21 00:19:33 +0100295 self._tb_logger = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700296 return self._exception
297
298 def add_done_callback(self, fn):
299 """Add a callback to be run when the future becomes done.
300
301 The callback is called with a single argument - the future object. If
302 the future is already done when this is called, the callback is
303 scheduled with call_soon.
304 """
305 if self._state != _PENDING:
306 self._loop.call_soon(fn, self)
307 else:
308 self._callbacks.append(fn)
309
310 # New method not in PEP 3148.
311
312 def remove_done_callback(self, fn):
313 """Remove all instances of a callback from the "call when done" list.
314
315 Returns the number of callbacks removed.
316 """
317 filtered_callbacks = [f for f in self._callbacks if f != fn]
318 removed_count = len(self._callbacks) - len(filtered_callbacks)
319 if removed_count:
320 self._callbacks[:] = filtered_callbacks
321 return removed_count
322
323 # So-called internal methods (note: no set_running_or_notify_cancel()).
324
Victor Stinnera9acbe82014-07-05 15:29:41 +0200325 def _set_result_unless_cancelled(self, result):
326 """Helper setting the result only if the future was not cancelled."""
327 if self.cancelled():
328 return
329 self.set_result(result)
330
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700331 def set_result(self, result):
332 """Mark the future done and set its result.
333
334 If the future is already done when this method is called, raises
335 InvalidStateError.
336 """
337 if self._state != _PENDING:
338 raise InvalidStateError('{}: {!r}'.format(self._state, self))
339 self._result = result
340 self._state = _FINISHED
341 self._schedule_callbacks()
342
343 def set_exception(self, exception):
344 """Mark the future done and set an exception.
345
346 If the future is already done when this method is called, raises
347 InvalidStateError.
348 """
349 if self._state != _PENDING:
350 raise InvalidStateError('{}: {!r}'.format(self._state, self))
Victor Stinner95728982014-01-30 16:01:54 -0800351 if isinstance(exception, type):
352 exception = exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700353 self._exception = exception
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700354 self._state = _FINISHED
355 self._schedule_callbacks()
Victor Stinner4c3c6992013-12-19 22:42:40 +0100356 if _PY34:
Victor Stinnere40c0782013-12-21 00:19:33 +0100357 self._log_traceback = True
Victor Stinner4c3c6992013-12-19 22:42:40 +0100358 else:
Victor Stinner80f53aa2014-06-27 13:52:20 +0200359 self._tb_logger = _TracebackLogger(self, exception)
Victor Stinner4c3c6992013-12-19 22:42:40 +0100360 # Arrange for the logger to be activated after all callbacks
361 # have had a chance to call result() or exception().
362 self._loop.call_soon(self._tb_logger.activate)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700363
364 # Truly internal methods.
365
366 def _copy_state(self, other):
367 """Internal helper to copy state from another Future.
368
369 The other Future may be a concurrent.futures.Future.
370 """
371 assert other.done()
Guido van Rossum7a465642013-11-22 11:47:22 -0800372 if self.cancelled():
373 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700374 assert not self.done()
375 if other.cancelled():
376 self.cancel()
377 else:
378 exception = other.exception()
379 if exception is not None:
380 self.set_exception(exception)
381 else:
382 result = other.result()
383 self.set_result(result)
384
385 def __iter__(self):
386 if not self.done():
387 self._blocking = True
388 yield self # This tells Task to wait for completion.
389 assert self.done(), "yield from wasn't used with future"
390 return self.result() # May raise too.
391
392
393def wrap_future(fut, *, loop=None):
394 """Wrap concurrent.futures.Future object."""
395 if isinstance(fut, Future):
396 return fut
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700397 assert isinstance(fut, concurrent.futures.Future), \
398 'concurrent.futures.Future is expected, got {!r}'.format(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700399 if loop is None:
400 loop = events.get_event_loop()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700401 new_future = Future(loop=loop)
Guido van Rossum7a465642013-11-22 11:47:22 -0800402
403 def _check_cancel_other(f):
404 if f.cancelled():
405 fut.cancel()
406
407 new_future.add_done_callback(_check_cancel_other)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700408 fut.add_done_callback(
409 lambda future: loop.call_soon_threadsafe(
410 new_future._copy_state, fut))
411 return new_future