blob: 4edd2e5059f96765e29aa0de752e0288127477e0 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""A Future class similar to the one in PEP 3148."""
2
3__all__ = ['CancelledError', 'TimeoutError',
4 'InvalidStateError',
5 'Future', 'wrap_future',
6 ]
7
8import concurrent.futures._base
9import logging
Victor Stinner4c3c6992013-12-19 22:42:40 +010010import sys
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011import traceback
12
13from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014
15# States for Future.
16_PENDING = 'PENDING'
17_CANCELLED = 'CANCELLED'
18_FINISHED = 'FINISHED'
19
Victor Stinner4c3c6992013-12-19 22:42:40 +010020_PY34 = sys.version_info >= (3, 4)
21
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022# TODO: Do we really want to depend on concurrent.futures internals?
23Error = concurrent.futures._base.Error
24CancelledError = concurrent.futures.CancelledError
25TimeoutError = concurrent.futures.TimeoutError
26
27STACK_DEBUG = logging.DEBUG - 1 # heavy-duty debugging
28
29
30class InvalidStateError(Error):
31 """The operation is not allowed in this state."""
32 # TODO: Show the future, its state, the method, and the required state.
33
34
35class _TracebackLogger:
36 """Helper to log a traceback upon destruction if not cleared.
37
38 This solves a nasty problem with Futures and Tasks that have an
39 exception set: if nobody asks for the exception, the exception is
40 never logged. This violates the Zen of Python: 'Errors should
41 never pass silently. Unless explicitly silenced.'
42
43 However, we don't want to log the exception as soon as
44 set_exception() is called: if the calling code is written
45 properly, it will get the exception and handle it properly. But
46 we *do* want to log it if result() or exception() was never called
47 -- otherwise developers waste a lot of time wondering why their
48 buggy code fails silently.
49
50 An earlier attempt added a __del__() method to the Future class
51 itself, but this backfired because the presence of __del__()
52 prevents garbage collection from breaking cycles. A way out of
53 this catch-22 is to avoid having a __del__() method on the Future
54 class itself, but instead to have a reference to a helper object
55 with a __del__() method that logs the traceback, where we ensure
56 that the helper object doesn't participate in cycles, and only the
57 Future has a reference to it.
58
59 The helper object is added when set_exception() is called. When
60 the Future is collected, and the helper is present, the helper
61 object is also collected, and its __del__() method will log the
62 traceback. When the Future's result() or exception() method is
63 called (and a helper object is present), it removes the the helper
64 object, after calling its clear() method to prevent it from
65 logging.
66
67 One downside is that we do a fair amount of work to extract the
68 traceback from the exception, even when it is never logged. It
69 would seem cheaper to just store the exception object, but that
70 references the traceback, which references stack frames, which may
71 reference the Future, which references the _TracebackLogger, and
72 then the _TracebackLogger would be included in a cycle, which is
73 what we're trying to avoid! As an optimization, we don't
74 immediately format the exception; we only do the work when
75 activate() is called, which call is delayed until after all the
76 Future's callbacks have run. Since usually a Future has at least
77 one callback (typically set by 'yield from') and usually that
78 callback extracts the callback, thereby removing the need to
79 format the exception.
80
81 PS. I don't claim credit for this solution. I first heard of it
82 in a discussion about closing files when they are collected.
83 """
84
Yury Selivanov569efa22014-02-18 18:02:19 -050085 __slots__ = ['exc', 'tb', 'loop']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086
Yury Selivanov569efa22014-02-18 18:02:19 -050087 def __init__(self, exc, loop):
88 self.loop = loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070089 self.exc = exc
90 self.tb = None
91
92 def activate(self):
93 exc = self.exc
94 if exc is not None:
95 self.exc = None
96 self.tb = traceback.format_exception(exc.__class__, exc,
97 exc.__traceback__)
98
99 def clear(self):
100 self.exc = None
101 self.tb = None
102
103 def __del__(self):
104 if self.tb:
Yury Selivanov569efa22014-02-18 18:02:19 -0500105 msg = 'Future/Task exception was never retrieved:\n{tb}'
106 context = {
107 'message': msg.format(tb=''.join(self.tb)),
108 }
109 self.loop.call_exception_handler(context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700110
111
112class Future:
113 """This class is *almost* compatible with concurrent.futures.Future.
114
115 Differences:
116
117 - result() and exception() do not take a timeout argument and
118 raise an exception when the future isn't done yet.
119
120 - Callbacks registered with add_done_callback() are always called
121 via the event loop's call_soon_threadsafe().
122
123 - This class is not compatible with the wait() and as_completed()
124 methods in the concurrent.futures package.
125
126 (In Python 3.4 or later we may be able to unify the implementations.)
127 """
128
129 # Class variables serving as defaults for instance variables.
130 _state = _PENDING
131 _result = None
132 _exception = None
133 _loop = None
134
135 _blocking = False # proper use of future (yield vs yield from)
136
Victor Stinnere40c0782013-12-21 00:19:33 +0100137 _log_traceback = False # Used for Python 3.4 and later
138 _tb_logger = None # Used for Python 3.3 only
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700139
140 def __init__(self, *, loop=None):
141 """Initialize the future.
142
143 The optional event_loop argument allows to explicitly set the event
144 loop object used by the future. If it's not provided, the future uses
145 the default event loop.
146 """
147 if loop is None:
148 self._loop = events.get_event_loop()
149 else:
150 self._loop = loop
151 self._callbacks = []
152
153 def __repr__(self):
154 res = self.__class__.__name__
155 if self._state == _FINISHED:
156 if self._exception is not None:
157 res += '<exception={!r}>'.format(self._exception)
158 else:
159 res += '<result={!r}>'.format(self._result)
160 elif self._callbacks:
161 size = len(self._callbacks)
162 if size > 2:
163 res += '<{}, [{}, <{} more>, {}]>'.format(
164 self._state, self._callbacks[0],
165 size-2, self._callbacks[-1])
166 else:
167 res += '<{}, {}>'.format(self._state, self._callbacks)
168 else:
169 res += '<{}>'.format(self._state)
170 return res
171
Victor Stinnera02f81f2014-06-24 22:37:53 +0200172 # On Python 3.3 or older, objects with a destructor part of a reference
173 # cycle are never destroyed. It's not more the case on Python 3.4 thanks to
174 # the PEP 442.
Victor Stinner4c3c6992013-12-19 22:42:40 +0100175 if _PY34:
176 def __del__(self):
Victor Stinnere40c0782013-12-21 00:19:33 +0100177 if not self._log_traceback:
178 # set_exception() was not called, or result() or exception()
179 # has consumed the exception
180 return
181 exc = self._exception
Yury Selivanov569efa22014-02-18 18:02:19 -0500182 context = {
183 'message': 'Future/Task exception was never retrieved',
184 'exception': exc,
185 'future': self,
186 }
187 self._loop.call_exception_handler(context)
Victor Stinner4c3c6992013-12-19 22:42:40 +0100188
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700189 def cancel(self):
190 """Cancel the future and schedule callbacks.
191
192 If the future is already done or cancelled, return False. Otherwise,
193 change the future's state to cancelled, schedule the callbacks and
194 return True.
195 """
196 if self._state != _PENDING:
197 return False
198 self._state = _CANCELLED
199 self._schedule_callbacks()
200 return True
201
202 def _schedule_callbacks(self):
203 """Internal: Ask the event loop to call all callbacks.
204
205 The callbacks are scheduled to be called as soon as possible. Also
206 clears the callback list.
207 """
208 callbacks = self._callbacks[:]
209 if not callbacks:
210 return
211
212 self._callbacks[:] = []
213 for callback in callbacks:
214 self._loop.call_soon(callback, self)
215
216 def cancelled(self):
217 """Return True if the future was cancelled."""
218 return self._state == _CANCELLED
219
220 # Don't implement running(); see http://bugs.python.org/issue18699
221
222 def done(self):
223 """Return True if the future is done.
224
225 Done means either that a result / exception are available, or that the
226 future was cancelled.
227 """
228 return self._state != _PENDING
229
230 def result(self):
231 """Return the result this future represents.
232
233 If the future has been cancelled, raises CancelledError. If the
234 future's result isn't yet available, raises InvalidStateError. If
235 the future is done and has an exception set, this exception is raised.
236 """
237 if self._state == _CANCELLED:
238 raise CancelledError
239 if self._state != _FINISHED:
240 raise InvalidStateError('Result is not ready.')
Victor Stinnere40c0782013-12-21 00:19:33 +0100241 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700242 if self._tb_logger is not None:
243 self._tb_logger.clear()
Victor Stinnere40c0782013-12-21 00:19:33 +0100244 self._tb_logger = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700245 if self._exception is not None:
246 raise self._exception
247 return self._result
248
249 def exception(self):
250 """Return the exception that was set on this future.
251
252 The exception (or None if no exception was set) is returned only if
253 the future is done. If the future has been cancelled, raises
254 CancelledError. If the future isn't done yet, raises
255 InvalidStateError.
256 """
257 if self._state == _CANCELLED:
258 raise CancelledError
259 if self._state != _FINISHED:
260 raise InvalidStateError('Exception is not set.')
Victor Stinnere40c0782013-12-21 00:19:33 +0100261 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700262 if self._tb_logger is not None:
263 self._tb_logger.clear()
Victor Stinnere40c0782013-12-21 00:19:33 +0100264 self._tb_logger = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700265 return self._exception
266
267 def add_done_callback(self, fn):
268 """Add a callback to be run when the future becomes done.
269
270 The callback is called with a single argument - the future object. If
271 the future is already done when this is called, the callback is
272 scheduled with call_soon.
273 """
274 if self._state != _PENDING:
275 self._loop.call_soon(fn, self)
276 else:
277 self._callbacks.append(fn)
278
279 # New method not in PEP 3148.
280
281 def remove_done_callback(self, fn):
282 """Remove all instances of a callback from the "call when done" list.
283
284 Returns the number of callbacks removed.
285 """
286 filtered_callbacks = [f for f in self._callbacks if f != fn]
287 removed_count = len(self._callbacks) - len(filtered_callbacks)
288 if removed_count:
289 self._callbacks[:] = filtered_callbacks
290 return removed_count
291
292 # So-called internal methods (note: no set_running_or_notify_cancel()).
293
294 def set_result(self, result):
295 """Mark the future done and set its result.
296
297 If the future is already done when this method is called, raises
298 InvalidStateError.
299 """
300 if self._state != _PENDING:
301 raise InvalidStateError('{}: {!r}'.format(self._state, self))
302 self._result = result
303 self._state = _FINISHED
304 self._schedule_callbacks()
305
306 def set_exception(self, exception):
307 """Mark the future done and set an exception.
308
309 If the future is already done when this method is called, raises
310 InvalidStateError.
311 """
312 if self._state != _PENDING:
313 raise InvalidStateError('{}: {!r}'.format(self._state, self))
Victor Stinner95728982014-01-30 16:01:54 -0800314 if isinstance(exception, type):
315 exception = exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700316 self._exception = exception
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700317 self._state = _FINISHED
318 self._schedule_callbacks()
Victor Stinner4c3c6992013-12-19 22:42:40 +0100319 if _PY34:
Victor Stinnere40c0782013-12-21 00:19:33 +0100320 self._log_traceback = True
Victor Stinner4c3c6992013-12-19 22:42:40 +0100321 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500322 self._tb_logger = _TracebackLogger(exception, self._loop)
Victor Stinner4c3c6992013-12-19 22:42:40 +0100323 # Arrange for the logger to be activated after all callbacks
324 # have had a chance to call result() or exception().
325 self._loop.call_soon(self._tb_logger.activate)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700326
327 # Truly internal methods.
328
329 def _copy_state(self, other):
330 """Internal helper to copy state from another Future.
331
332 The other Future may be a concurrent.futures.Future.
333 """
334 assert other.done()
Guido van Rossum7a465642013-11-22 11:47:22 -0800335 if self.cancelled():
336 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700337 assert not self.done()
338 if other.cancelled():
339 self.cancel()
340 else:
341 exception = other.exception()
342 if exception is not None:
343 self.set_exception(exception)
344 else:
345 result = other.result()
346 self.set_result(result)
347
348 def __iter__(self):
349 if not self.done():
350 self._blocking = True
351 yield self # This tells Task to wait for completion.
352 assert self.done(), "yield from wasn't used with future"
353 return self.result() # May raise too.
354
355
356def wrap_future(fut, *, loop=None):
357 """Wrap concurrent.futures.Future object."""
358 if isinstance(fut, Future):
359 return fut
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700360 assert isinstance(fut, concurrent.futures.Future), \
361 'concurrent.futures.Future is expected, got {!r}'.format(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700362 if loop is None:
363 loop = events.get_event_loop()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700364 new_future = Future(loop=loop)
Guido van Rossum7a465642013-11-22 11:47:22 -0800365
366 def _check_cancel_other(f):
367 if f.cancelled():
368 fut.cancel()
369
370 new_future.add_done_callback(_check_cancel_other)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700371 fut.add_done_callback(
372 lambda future: loop.call_soon_threadsafe(
373 new_future._copy_state, fut))
374 return new_future