blob: 91ea1706618915aa28572f0fedd8a0dd8293ff8d [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""A Future class similar to the one in PEP 3148."""
2
3__all__ = ['CancelledError', 'TimeoutError',
4 'InvalidStateError',
5 'Future', 'wrap_future',
6 ]
7
8import concurrent.futures._base
9import logging
Victor Stinner4c3c6992013-12-19 22:42:40 +010010import sys
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011import traceback
12
13from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014
15# States for Future.
16_PENDING = 'PENDING'
17_CANCELLED = 'CANCELLED'
18_FINISHED = 'FINISHED'
19
Victor Stinner4c3c6992013-12-19 22:42:40 +010020_PY34 = sys.version_info >= (3, 4)
21
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022# TODO: Do we really want to depend on concurrent.futures internals?
23Error = concurrent.futures._base.Error
24CancelledError = concurrent.futures.CancelledError
25TimeoutError = concurrent.futures.TimeoutError
26
27STACK_DEBUG = logging.DEBUG - 1 # heavy-duty debugging
28
29
30class InvalidStateError(Error):
31 """The operation is not allowed in this state."""
32 # TODO: Show the future, its state, the method, and the required state.
33
34
35class _TracebackLogger:
36 """Helper to log a traceback upon destruction if not cleared.
37
38 This solves a nasty problem with Futures and Tasks that have an
39 exception set: if nobody asks for the exception, the exception is
40 never logged. This violates the Zen of Python: 'Errors should
41 never pass silently. Unless explicitly silenced.'
42
43 However, we don't want to log the exception as soon as
44 set_exception() is called: if the calling code is written
45 properly, it will get the exception and handle it properly. But
46 we *do* want to log it if result() or exception() was never called
47 -- otherwise developers waste a lot of time wondering why their
48 buggy code fails silently.
49
50 An earlier attempt added a __del__() method to the Future class
51 itself, but this backfired because the presence of __del__()
52 prevents garbage collection from breaking cycles. A way out of
53 this catch-22 is to avoid having a __del__() method on the Future
54 class itself, but instead to have a reference to a helper object
55 with a __del__() method that logs the traceback, where we ensure
56 that the helper object doesn't participate in cycles, and only the
57 Future has a reference to it.
58
59 The helper object is added when set_exception() is called. When
60 the Future is collected, and the helper is present, the helper
61 object is also collected, and its __del__() method will log the
62 traceback. When the Future's result() or exception() method is
63 called (and a helper object is present), it removes the the helper
64 object, after calling its clear() method to prevent it from
65 logging.
66
67 One downside is that we do a fair amount of work to extract the
68 traceback from the exception, even when it is never logged. It
69 would seem cheaper to just store the exception object, but that
70 references the traceback, which references stack frames, which may
71 reference the Future, which references the _TracebackLogger, and
72 then the _TracebackLogger would be included in a cycle, which is
73 what we're trying to avoid! As an optimization, we don't
74 immediately format the exception; we only do the work when
75 activate() is called, which call is delayed until after all the
76 Future's callbacks have run. Since usually a Future has at least
77 one callback (typically set by 'yield from') and usually that
78 callback extracts the callback, thereby removing the need to
79 format the exception.
80
81 PS. I don't claim credit for this solution. I first heard of it
82 in a discussion about closing files when they are collected.
83 """
84
Yury Selivanov569efa22014-02-18 18:02:19 -050085 __slots__ = ['exc', 'tb', 'loop']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086
Yury Selivanov569efa22014-02-18 18:02:19 -050087 def __init__(self, exc, loop):
88 self.loop = loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070089 self.exc = exc
90 self.tb = None
91
92 def activate(self):
93 exc = self.exc
94 if exc is not None:
95 self.exc = None
96 self.tb = traceback.format_exception(exc.__class__, exc,
97 exc.__traceback__)
98
99 def clear(self):
100 self.exc = None
101 self.tb = None
102
103 def __del__(self):
104 if self.tb:
Yury Selivanov569efa22014-02-18 18:02:19 -0500105 msg = 'Future/Task exception was never retrieved:\n{tb}'
106 context = {
107 'message': msg.format(tb=''.join(self.tb)),
108 }
109 self.loop.call_exception_handler(context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700110
111
112class Future:
113 """This class is *almost* compatible with concurrent.futures.Future.
114
115 Differences:
116
117 - result() and exception() do not take a timeout argument and
118 raise an exception when the future isn't done yet.
119
120 - Callbacks registered with add_done_callback() are always called
121 via the event loop's call_soon_threadsafe().
122
123 - This class is not compatible with the wait() and as_completed()
124 methods in the concurrent.futures package.
125
126 (In Python 3.4 or later we may be able to unify the implementations.)
127 """
128
129 # Class variables serving as defaults for instance variables.
130 _state = _PENDING
131 _result = None
132 _exception = None
133 _loop = None
134
135 _blocking = False # proper use of future (yield vs yield from)
136
Victor Stinnere40c0782013-12-21 00:19:33 +0100137 _log_traceback = False # Used for Python 3.4 and later
138 _tb_logger = None # Used for Python 3.3 only
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700139
140 def __init__(self, *, loop=None):
141 """Initialize the future.
142
143 The optional event_loop argument allows to explicitly set the event
144 loop object used by the future. If it's not provided, the future uses
145 the default event loop.
146 """
147 if loop is None:
148 self._loop = events.get_event_loop()
149 else:
150 self._loop = loop
151 self._callbacks = []
152
153 def __repr__(self):
154 res = self.__class__.__name__
155 if self._state == _FINISHED:
156 if self._exception is not None:
157 res += '<exception={!r}>'.format(self._exception)
158 else:
159 res += '<result={!r}>'.format(self._result)
160 elif self._callbacks:
161 size = len(self._callbacks)
162 if size > 2:
163 res += '<{}, [{}, <{} more>, {}]>'.format(
164 self._state, self._callbacks[0],
165 size-2, self._callbacks[-1])
166 else:
167 res += '<{}, {}>'.format(self._state, self._callbacks)
168 else:
169 res += '<{}>'.format(self._state)
170 return res
171
Victor Stinner4c3c6992013-12-19 22:42:40 +0100172 if _PY34:
173 def __del__(self):
Victor Stinnere40c0782013-12-21 00:19:33 +0100174 if not self._log_traceback:
175 # set_exception() was not called, or result() or exception()
176 # has consumed the exception
177 return
178 exc = self._exception
Yury Selivanov569efa22014-02-18 18:02:19 -0500179 context = {
180 'message': 'Future/Task exception was never retrieved',
181 'exception': exc,
182 'future': self,
183 }
184 self._loop.call_exception_handler(context)
Victor Stinner4c3c6992013-12-19 22:42:40 +0100185
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700186 def cancel(self):
187 """Cancel the future and schedule callbacks.
188
189 If the future is already done or cancelled, return False. Otherwise,
190 change the future's state to cancelled, schedule the callbacks and
191 return True.
192 """
193 if self._state != _PENDING:
194 return False
195 self._state = _CANCELLED
196 self._schedule_callbacks()
197 return True
198
199 def _schedule_callbacks(self):
200 """Internal: Ask the event loop to call all callbacks.
201
202 The callbacks are scheduled to be called as soon as possible. Also
203 clears the callback list.
204 """
205 callbacks = self._callbacks[:]
206 if not callbacks:
207 return
208
209 self._callbacks[:] = []
210 for callback in callbacks:
211 self._loop.call_soon(callback, self)
212
213 def cancelled(self):
214 """Return True if the future was cancelled."""
215 return self._state == _CANCELLED
216
217 # Don't implement running(); see http://bugs.python.org/issue18699
218
219 def done(self):
220 """Return True if the future is done.
221
222 Done means either that a result / exception are available, or that the
223 future was cancelled.
224 """
225 return self._state != _PENDING
226
227 def result(self):
228 """Return the result this future represents.
229
230 If the future has been cancelled, raises CancelledError. If the
231 future's result isn't yet available, raises InvalidStateError. If
232 the future is done and has an exception set, this exception is raised.
233 """
234 if self._state == _CANCELLED:
235 raise CancelledError
236 if self._state != _FINISHED:
237 raise InvalidStateError('Result is not ready.')
Victor Stinnere40c0782013-12-21 00:19:33 +0100238 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700239 if self._tb_logger is not None:
240 self._tb_logger.clear()
Victor Stinnere40c0782013-12-21 00:19:33 +0100241 self._tb_logger = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700242 if self._exception is not None:
243 raise self._exception
244 return self._result
245
246 def exception(self):
247 """Return the exception that was set on this future.
248
249 The exception (or None if no exception was set) is returned only if
250 the future is done. If the future has been cancelled, raises
251 CancelledError. If the future isn't done yet, raises
252 InvalidStateError.
253 """
254 if self._state == _CANCELLED:
255 raise CancelledError
256 if self._state != _FINISHED:
257 raise InvalidStateError('Exception is not set.')
Victor Stinnere40c0782013-12-21 00:19:33 +0100258 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700259 if self._tb_logger is not None:
260 self._tb_logger.clear()
Victor Stinnere40c0782013-12-21 00:19:33 +0100261 self._tb_logger = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700262 return self._exception
263
264 def add_done_callback(self, fn):
265 """Add a callback to be run when the future becomes done.
266
267 The callback is called with a single argument - the future object. If
268 the future is already done when this is called, the callback is
269 scheduled with call_soon.
270 """
271 if self._state != _PENDING:
272 self._loop.call_soon(fn, self)
273 else:
274 self._callbacks.append(fn)
275
276 # New method not in PEP 3148.
277
278 def remove_done_callback(self, fn):
279 """Remove all instances of a callback from the "call when done" list.
280
281 Returns the number of callbacks removed.
282 """
283 filtered_callbacks = [f for f in self._callbacks if f != fn]
284 removed_count = len(self._callbacks) - len(filtered_callbacks)
285 if removed_count:
286 self._callbacks[:] = filtered_callbacks
287 return removed_count
288
289 # So-called internal methods (note: no set_running_or_notify_cancel()).
290
291 def set_result(self, result):
292 """Mark the future done and set its result.
293
294 If the future is already done when this method is called, raises
295 InvalidStateError.
296 """
297 if self._state != _PENDING:
298 raise InvalidStateError('{}: {!r}'.format(self._state, self))
299 self._result = result
300 self._state = _FINISHED
301 self._schedule_callbacks()
302
303 def set_exception(self, exception):
304 """Mark the future done and set an exception.
305
306 If the future is already done when this method is called, raises
307 InvalidStateError.
308 """
309 if self._state != _PENDING:
310 raise InvalidStateError('{}: {!r}'.format(self._state, self))
Victor Stinner95728982014-01-30 16:01:54 -0800311 if isinstance(exception, type):
312 exception = exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700313 self._exception = exception
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700314 self._state = _FINISHED
315 self._schedule_callbacks()
Victor Stinner4c3c6992013-12-19 22:42:40 +0100316 if _PY34:
Victor Stinnere40c0782013-12-21 00:19:33 +0100317 self._log_traceback = True
Victor Stinner4c3c6992013-12-19 22:42:40 +0100318 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500319 self._tb_logger = _TracebackLogger(exception, self._loop)
Victor Stinner4c3c6992013-12-19 22:42:40 +0100320 # Arrange for the logger to be activated after all callbacks
321 # have had a chance to call result() or exception().
322 self._loop.call_soon(self._tb_logger.activate)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700323
324 # Truly internal methods.
325
326 def _copy_state(self, other):
327 """Internal helper to copy state from another Future.
328
329 The other Future may be a concurrent.futures.Future.
330 """
331 assert other.done()
Guido van Rossum7a465642013-11-22 11:47:22 -0800332 if self.cancelled():
333 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700334 assert not self.done()
335 if other.cancelled():
336 self.cancel()
337 else:
338 exception = other.exception()
339 if exception is not None:
340 self.set_exception(exception)
341 else:
342 result = other.result()
343 self.set_result(result)
344
345 def __iter__(self):
346 if not self.done():
347 self._blocking = True
348 yield self # This tells Task to wait for completion.
349 assert self.done(), "yield from wasn't used with future"
350 return self.result() # May raise too.
351
352
353def wrap_future(fut, *, loop=None):
354 """Wrap concurrent.futures.Future object."""
355 if isinstance(fut, Future):
356 return fut
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700357 assert isinstance(fut, concurrent.futures.Future), \
358 'concurrent.futures.Future is expected, got {!r}'.format(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700359 if loop is None:
360 loop = events.get_event_loop()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700361 new_future = Future(loop=loop)
Guido van Rossum7a465642013-11-22 11:47:22 -0800362
363 def _check_cancel_other(f):
364 if f.cancelled():
365 fut.cancel()
366
367 new_future.add_done_callback(_check_cancel_other)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700368 fut.add_done_callback(
369 lambda future: loop.call_soon_threadsafe(
370 new_future._copy_state, fut))
371 return new_future