blob: 9ee13e3e2b79410fb789de91a98dcdca56dc4a41 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""A Future class similar to the one in PEP 3148."""
2
3__all__ = ['CancelledError', 'TimeoutError',
4 'InvalidStateError',
5 'Future', 'wrap_future',
6 ]
7
8import concurrent.futures._base
9import logging
Victor Stinner4c3c6992013-12-19 22:42:40 +010010import sys
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011import traceback
12
13from . import events
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070014from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015
16# States for Future.
17_PENDING = 'PENDING'
18_CANCELLED = 'CANCELLED'
19_FINISHED = 'FINISHED'
20
Victor Stinner4c3c6992013-12-19 22:42:40 +010021_PY34 = sys.version_info >= (3, 4)
22
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023# TODO: Do we really want to depend on concurrent.futures internals?
24Error = concurrent.futures._base.Error
25CancelledError = concurrent.futures.CancelledError
26TimeoutError = concurrent.futures.TimeoutError
27
28STACK_DEBUG = logging.DEBUG - 1 # heavy-duty debugging
29
30
31class InvalidStateError(Error):
32 """The operation is not allowed in this state."""
33 # TODO: Show the future, its state, the method, and the required state.
34
35
36class _TracebackLogger:
37 """Helper to log a traceback upon destruction if not cleared.
38
39 This solves a nasty problem with Futures and Tasks that have an
40 exception set: if nobody asks for the exception, the exception is
41 never logged. This violates the Zen of Python: 'Errors should
42 never pass silently. Unless explicitly silenced.'
43
44 However, we don't want to log the exception as soon as
45 set_exception() is called: if the calling code is written
46 properly, it will get the exception and handle it properly. But
47 we *do* want to log it if result() or exception() was never called
48 -- otherwise developers waste a lot of time wondering why their
49 buggy code fails silently.
50
51 An earlier attempt added a __del__() method to the Future class
52 itself, but this backfired because the presence of __del__()
53 prevents garbage collection from breaking cycles. A way out of
54 this catch-22 is to avoid having a __del__() method on the Future
55 class itself, but instead to have a reference to a helper object
56 with a __del__() method that logs the traceback, where we ensure
57 that the helper object doesn't participate in cycles, and only the
58 Future has a reference to it.
59
60 The helper object is added when set_exception() is called. When
61 the Future is collected, and the helper is present, the helper
62 object is also collected, and its __del__() method will log the
63 traceback. When the Future's result() or exception() method is
64 called (and a helper object is present), it removes the the helper
65 object, after calling its clear() method to prevent it from
66 logging.
67
68 One downside is that we do a fair amount of work to extract the
69 traceback from the exception, even when it is never logged. It
70 would seem cheaper to just store the exception object, but that
71 references the traceback, which references stack frames, which may
72 reference the Future, which references the _TracebackLogger, and
73 then the _TracebackLogger would be included in a cycle, which is
74 what we're trying to avoid! As an optimization, we don't
75 immediately format the exception; we only do the work when
76 activate() is called, which call is delayed until after all the
77 Future's callbacks have run. Since usually a Future has at least
78 one callback (typically set by 'yield from') and usually that
79 callback extracts the callback, thereby removing the need to
80 format the exception.
81
82 PS. I don't claim credit for this solution. I first heard of it
83 in a discussion about closing files when they are collected.
84 """
85
86 __slots__ = ['exc', 'tb']
87
88 def __init__(self, exc):
89 self.exc = exc
90 self.tb = None
91
92 def activate(self):
93 exc = self.exc
94 if exc is not None:
95 self.exc = None
96 self.tb = traceback.format_exception(exc.__class__, exc,
97 exc.__traceback__)
98
99 def clear(self):
100 self.exc = None
101 self.tb = None
102
103 def __del__(self):
104 if self.tb:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700105 logger.error('Future/Task exception was never retrieved:\n%s',
106 ''.join(self.tb))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700107
108
109class Future:
110 """This class is *almost* compatible with concurrent.futures.Future.
111
112 Differences:
113
114 - result() and exception() do not take a timeout argument and
115 raise an exception when the future isn't done yet.
116
117 - Callbacks registered with add_done_callback() are always called
118 via the event loop's call_soon_threadsafe().
119
120 - This class is not compatible with the wait() and as_completed()
121 methods in the concurrent.futures package.
122
123 (In Python 3.4 or later we may be able to unify the implementations.)
124 """
125
126 # Class variables serving as defaults for instance variables.
127 _state = _PENDING
128 _result = None
129 _exception = None
130 _loop = None
131
132 _blocking = False # proper use of future (yield vs yield from)
133
Victor Stinnere40c0782013-12-21 00:19:33 +0100134 _log_traceback = False # Used for Python 3.4 and later
135 _tb_logger = None # Used for Python 3.3 only
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700136
137 def __init__(self, *, loop=None):
138 """Initialize the future.
139
140 The optional event_loop argument allows to explicitly set the event
141 loop object used by the future. If it's not provided, the future uses
142 the default event loop.
143 """
144 if loop is None:
145 self._loop = events.get_event_loop()
146 else:
147 self._loop = loop
148 self._callbacks = []
149
150 def __repr__(self):
151 res = self.__class__.__name__
152 if self._state == _FINISHED:
153 if self._exception is not None:
154 res += '<exception={!r}>'.format(self._exception)
155 else:
156 res += '<result={!r}>'.format(self._result)
157 elif self._callbacks:
158 size = len(self._callbacks)
159 if size > 2:
160 res += '<{}, [{}, <{} more>, {}]>'.format(
161 self._state, self._callbacks[0],
162 size-2, self._callbacks[-1])
163 else:
164 res += '<{}, {}>'.format(self._state, self._callbacks)
165 else:
166 res += '<{}>'.format(self._state)
167 return res
168
Victor Stinner4c3c6992013-12-19 22:42:40 +0100169 if _PY34:
170 def __del__(self):
Victor Stinnere40c0782013-12-21 00:19:33 +0100171 if not self._log_traceback:
172 # set_exception() was not called, or result() or exception()
173 # has consumed the exception
174 return
175 exc = self._exception
176 logger.error('Future/Task exception was never retrieved:',
177 exc_info=(exc.__class__, exc, exc.__traceback__))
Victor Stinner4c3c6992013-12-19 22:42:40 +0100178
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700179 def cancel(self):
180 """Cancel the future and schedule callbacks.
181
182 If the future is already done or cancelled, return False. Otherwise,
183 change the future's state to cancelled, schedule the callbacks and
184 return True.
185 """
186 if self._state != _PENDING:
187 return False
188 self._state = _CANCELLED
189 self._schedule_callbacks()
190 return True
191
192 def _schedule_callbacks(self):
193 """Internal: Ask the event loop to call all callbacks.
194
195 The callbacks are scheduled to be called as soon as possible. Also
196 clears the callback list.
197 """
198 callbacks = self._callbacks[:]
199 if not callbacks:
200 return
201
202 self._callbacks[:] = []
203 for callback in callbacks:
204 self._loop.call_soon(callback, self)
205
206 def cancelled(self):
207 """Return True if the future was cancelled."""
208 return self._state == _CANCELLED
209
210 # Don't implement running(); see http://bugs.python.org/issue18699
211
212 def done(self):
213 """Return True if the future is done.
214
215 Done means either that a result / exception are available, or that the
216 future was cancelled.
217 """
218 return self._state != _PENDING
219
220 def result(self):
221 """Return the result this future represents.
222
223 If the future has been cancelled, raises CancelledError. If the
224 future's result isn't yet available, raises InvalidStateError. If
225 the future is done and has an exception set, this exception is raised.
226 """
227 if self._state == _CANCELLED:
228 raise CancelledError
229 if self._state != _FINISHED:
230 raise InvalidStateError('Result is not ready.')
Victor Stinnere40c0782013-12-21 00:19:33 +0100231 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700232 if self._tb_logger is not None:
233 self._tb_logger.clear()
Victor Stinnere40c0782013-12-21 00:19:33 +0100234 self._tb_logger = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700235 if self._exception is not None:
236 raise self._exception
237 return self._result
238
239 def exception(self):
240 """Return the exception that was set on this future.
241
242 The exception (or None if no exception was set) is returned only if
243 the future is done. If the future has been cancelled, raises
244 CancelledError. If the future isn't done yet, raises
245 InvalidStateError.
246 """
247 if self._state == _CANCELLED:
248 raise CancelledError
249 if self._state != _FINISHED:
250 raise InvalidStateError('Exception is not set.')
Victor Stinnere40c0782013-12-21 00:19:33 +0100251 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700252 if self._tb_logger is not None:
253 self._tb_logger.clear()
Victor Stinnere40c0782013-12-21 00:19:33 +0100254 self._tb_logger = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700255 return self._exception
256
257 def add_done_callback(self, fn):
258 """Add a callback to be run when the future becomes done.
259
260 The callback is called with a single argument - the future object. If
261 the future is already done when this is called, the callback is
262 scheduled with call_soon.
263 """
264 if self._state != _PENDING:
265 self._loop.call_soon(fn, self)
266 else:
267 self._callbacks.append(fn)
268
269 # New method not in PEP 3148.
270
271 def remove_done_callback(self, fn):
272 """Remove all instances of a callback from the "call when done" list.
273
274 Returns the number of callbacks removed.
275 """
276 filtered_callbacks = [f for f in self._callbacks if f != fn]
277 removed_count = len(self._callbacks) - len(filtered_callbacks)
278 if removed_count:
279 self._callbacks[:] = filtered_callbacks
280 return removed_count
281
282 # So-called internal methods (note: no set_running_or_notify_cancel()).
283
284 def set_result(self, result):
285 """Mark the future done and set its result.
286
287 If the future is already done when this method is called, raises
288 InvalidStateError.
289 """
290 if self._state != _PENDING:
291 raise InvalidStateError('{}: {!r}'.format(self._state, self))
292 self._result = result
293 self._state = _FINISHED
294 self._schedule_callbacks()
295
296 def set_exception(self, exception):
297 """Mark the future done and set an exception.
298
299 If the future is already done when this method is called, raises
300 InvalidStateError.
301 """
302 if self._state != _PENDING:
303 raise InvalidStateError('{}: {!r}'.format(self._state, self))
304 self._exception = exception
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700305 self._state = _FINISHED
306 self._schedule_callbacks()
Victor Stinner4c3c6992013-12-19 22:42:40 +0100307 if _PY34:
Victor Stinnere40c0782013-12-21 00:19:33 +0100308 self._log_traceback = True
Victor Stinner4c3c6992013-12-19 22:42:40 +0100309 else:
310 self._tb_logger = _TracebackLogger(exception)
311 # Arrange for the logger to be activated after all callbacks
312 # have had a chance to call result() or exception().
313 self._loop.call_soon(self._tb_logger.activate)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700314
315 # Truly internal methods.
316
317 def _copy_state(self, other):
318 """Internal helper to copy state from another Future.
319
320 The other Future may be a concurrent.futures.Future.
321 """
322 assert other.done()
Guido van Rossum7a465642013-11-22 11:47:22 -0800323 if self.cancelled():
324 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700325 assert not self.done()
326 if other.cancelled():
327 self.cancel()
328 else:
329 exception = other.exception()
330 if exception is not None:
331 self.set_exception(exception)
332 else:
333 result = other.result()
334 self.set_result(result)
335
336 def __iter__(self):
337 if not self.done():
338 self._blocking = True
339 yield self # This tells Task to wait for completion.
340 assert self.done(), "yield from wasn't used with future"
341 return self.result() # May raise too.
342
343
344def wrap_future(fut, *, loop=None):
345 """Wrap concurrent.futures.Future object."""
346 if isinstance(fut, Future):
347 return fut
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700348 assert isinstance(fut, concurrent.futures.Future), \
349 'concurrent.futures.Future is expected, got {!r}'.format(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700350 if loop is None:
351 loop = events.get_event_loop()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700352 new_future = Future(loop=loop)
Guido van Rossum7a465642013-11-22 11:47:22 -0800353
354 def _check_cancel_other(f):
355 if f.cancelled():
356 fut.cancel()
357
358 new_future.add_done_callback(_check_cancel_other)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700359 fut.add_done_callback(
360 lambda future: loop.call_soon_threadsafe(
361 new_future._copy_state, fut))
362 return new_future