blob: 1be3117ebbe62270daeaeb4010e7718eefa35fc8 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""A Future class similar to the one in PEP 3148."""
2
3__all__ = ['CancelledError', 'TimeoutError',
4 'InvalidStateError',
5 'Future', 'wrap_future',
6 ]
7
8import concurrent.futures._base
9import logging
Victor Stinner4c3c6992013-12-19 22:42:40 +010010import sys
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011import traceback
12
13from . import events
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070014from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015
16# States for Future.
17_PENDING = 'PENDING'
18_CANCELLED = 'CANCELLED'
19_FINISHED = 'FINISHED'
20
Victor Stinner4c3c6992013-12-19 22:42:40 +010021_PY34 = sys.version_info >= (3, 4)
22
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023# TODO: Do we really want to depend on concurrent.futures internals?
24Error = concurrent.futures._base.Error
25CancelledError = concurrent.futures.CancelledError
26TimeoutError = concurrent.futures.TimeoutError
27
28STACK_DEBUG = logging.DEBUG - 1 # heavy-duty debugging
29
30
31class InvalidStateError(Error):
32 """The operation is not allowed in this state."""
33 # TODO: Show the future, its state, the method, and the required state.
34
35
36class _TracebackLogger:
37 """Helper to log a traceback upon destruction if not cleared.
38
39 This solves a nasty problem with Futures and Tasks that have an
40 exception set: if nobody asks for the exception, the exception is
41 never logged. This violates the Zen of Python: 'Errors should
42 never pass silently. Unless explicitly silenced.'
43
44 However, we don't want to log the exception as soon as
45 set_exception() is called: if the calling code is written
46 properly, it will get the exception and handle it properly. But
47 we *do* want to log it if result() or exception() was never called
48 -- otherwise developers waste a lot of time wondering why their
49 buggy code fails silently.
50
51 An earlier attempt added a __del__() method to the Future class
52 itself, but this backfired because the presence of __del__()
53 prevents garbage collection from breaking cycles. A way out of
54 this catch-22 is to avoid having a __del__() method on the Future
55 class itself, but instead to have a reference to a helper object
56 with a __del__() method that logs the traceback, where we ensure
57 that the helper object doesn't participate in cycles, and only the
58 Future has a reference to it.
59
60 The helper object is added when set_exception() is called. When
61 the Future is collected, and the helper is present, the helper
62 object is also collected, and its __del__() method will log the
63 traceback. When the Future's result() or exception() method is
64 called (and a helper object is present), it removes the the helper
65 object, after calling its clear() method to prevent it from
66 logging.
67
68 One downside is that we do a fair amount of work to extract the
69 traceback from the exception, even when it is never logged. It
70 would seem cheaper to just store the exception object, but that
71 references the traceback, which references stack frames, which may
72 reference the Future, which references the _TracebackLogger, and
73 then the _TracebackLogger would be included in a cycle, which is
74 what we're trying to avoid! As an optimization, we don't
75 immediately format the exception; we only do the work when
76 activate() is called, which call is delayed until after all the
77 Future's callbacks have run. Since usually a Future has at least
78 one callback (typically set by 'yield from') and usually that
79 callback extracts the callback, thereby removing the need to
80 format the exception.
81
82 PS. I don't claim credit for this solution. I first heard of it
83 in a discussion about closing files when they are collected.
84 """
85
86 __slots__ = ['exc', 'tb']
87
88 def __init__(self, exc):
89 self.exc = exc
90 self.tb = None
91
92 def activate(self):
93 exc = self.exc
94 if exc is not None:
95 self.exc = None
96 self.tb = traceback.format_exception(exc.__class__, exc,
97 exc.__traceback__)
98
99 def clear(self):
100 self.exc = None
101 self.tb = None
102
103 def __del__(self):
104 if self.tb:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700105 logger.error('Future/Task exception was never retrieved:\n%s',
106 ''.join(self.tb))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700107
108
109class Future:
110 """This class is *almost* compatible with concurrent.futures.Future.
111
112 Differences:
113
114 - result() and exception() do not take a timeout argument and
115 raise an exception when the future isn't done yet.
116
117 - Callbacks registered with add_done_callback() are always called
118 via the event loop's call_soon_threadsafe().
119
120 - This class is not compatible with the wait() and as_completed()
121 methods in the concurrent.futures package.
122
123 (In Python 3.4 or later we may be able to unify the implementations.)
124 """
125
126 # Class variables serving as defaults for instance variables.
127 _state = _PENDING
128 _result = None
129 _exception = None
130 _loop = None
131
132 _blocking = False # proper use of future (yield vs yield from)
133
Victor Stinner4c3c6992013-12-19 22:42:40 +0100134 _traceback = None # Used for Python 3.4 and later
135 _tb_logger = None # Used for Python 3.3 only
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700136
137 def __init__(self, *, loop=None):
138 """Initialize the future.
139
140 The optional event_loop argument allows to explicitly set the event
141 loop object used by the future. If it's not provided, the future uses
142 the default event loop.
143 """
144 if loop is None:
145 self._loop = events.get_event_loop()
146 else:
147 self._loop = loop
148 self._callbacks = []
149
150 def __repr__(self):
151 res = self.__class__.__name__
152 if self._state == _FINISHED:
153 if self._exception is not None:
154 res += '<exception={!r}>'.format(self._exception)
155 else:
156 res += '<result={!r}>'.format(self._result)
157 elif self._callbacks:
158 size = len(self._callbacks)
159 if size > 2:
160 res += '<{}, [{}, <{} more>, {}]>'.format(
161 self._state, self._callbacks[0],
162 size-2, self._callbacks[-1])
163 else:
164 res += '<{}, {}>'.format(self._state, self._callbacks)
165 else:
166 res += '<{}>'.format(self._state)
167 return res
168
Victor Stinner4c3c6992013-12-19 22:42:40 +0100169 if _PY34:
170 def __del__(self):
171 if self._traceback is not None:
172 logger.error('Future/Task exception was never retrieved:\n%s',
173 ''.join(self._traceback))
174
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700175 def cancel(self):
176 """Cancel the future and schedule callbacks.
177
178 If the future is already done or cancelled, return False. Otherwise,
179 change the future's state to cancelled, schedule the callbacks and
180 return True.
181 """
182 if self._state != _PENDING:
183 return False
184 self._state = _CANCELLED
185 self._schedule_callbacks()
186 return True
187
188 def _schedule_callbacks(self):
189 """Internal: Ask the event loop to call all callbacks.
190
191 The callbacks are scheduled to be called as soon as possible. Also
192 clears the callback list.
193 """
194 callbacks = self._callbacks[:]
195 if not callbacks:
196 return
197
198 self._callbacks[:] = []
199 for callback in callbacks:
200 self._loop.call_soon(callback, self)
201
202 def cancelled(self):
203 """Return True if the future was cancelled."""
204 return self._state == _CANCELLED
205
206 # Don't implement running(); see http://bugs.python.org/issue18699
207
208 def done(self):
209 """Return True if the future is done.
210
211 Done means either that a result / exception are available, or that the
212 future was cancelled.
213 """
214 return self._state != _PENDING
215
216 def result(self):
217 """Return the result this future represents.
218
219 If the future has been cancelled, raises CancelledError. If the
220 future's result isn't yet available, raises InvalidStateError. If
221 the future is done and has an exception set, this exception is raised.
222 """
223 if self._state == _CANCELLED:
224 raise CancelledError
225 if self._state != _FINISHED:
226 raise InvalidStateError('Result is not ready.')
Victor Stinner4c3c6992013-12-19 22:42:40 +0100227 self._traceback = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700228 if self._tb_logger is not None:
229 self._tb_logger.clear()
Victor Stinner4c3c6992013-12-19 22:42:40 +0100230 self._tb_logger = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700231 if self._exception is not None:
232 raise self._exception
233 return self._result
234
235 def exception(self):
236 """Return the exception that was set on this future.
237
238 The exception (or None if no exception was set) is returned only if
239 the future is done. If the future has been cancelled, raises
240 CancelledError. If the future isn't done yet, raises
241 InvalidStateError.
242 """
243 if self._state == _CANCELLED:
244 raise CancelledError
245 if self._state != _FINISHED:
246 raise InvalidStateError('Exception is not set.')
Victor Stinner4c3c6992013-12-19 22:42:40 +0100247 self._traceback = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700248 if self._tb_logger is not None:
249 self._tb_logger.clear()
Victor Stinner4c3c6992013-12-19 22:42:40 +0100250 self._tb_logger = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700251 return self._exception
252
253 def add_done_callback(self, fn):
254 """Add a callback to be run when the future becomes done.
255
256 The callback is called with a single argument - the future object. If
257 the future is already done when this is called, the callback is
258 scheduled with call_soon.
259 """
260 if self._state != _PENDING:
261 self._loop.call_soon(fn, self)
262 else:
263 self._callbacks.append(fn)
264
265 # New method not in PEP 3148.
266
267 def remove_done_callback(self, fn):
268 """Remove all instances of a callback from the "call when done" list.
269
270 Returns the number of callbacks removed.
271 """
272 filtered_callbacks = [f for f in self._callbacks if f != fn]
273 removed_count = len(self._callbacks) - len(filtered_callbacks)
274 if removed_count:
275 self._callbacks[:] = filtered_callbacks
276 return removed_count
277
278 # So-called internal methods (note: no set_running_or_notify_cancel()).
279
280 def set_result(self, result):
281 """Mark the future done and set its result.
282
283 If the future is already done when this method is called, raises
284 InvalidStateError.
285 """
286 if self._state != _PENDING:
287 raise InvalidStateError('{}: {!r}'.format(self._state, self))
288 self._result = result
289 self._state = _FINISHED
290 self._schedule_callbacks()
291
292 def set_exception(self, exception):
293 """Mark the future done and set an exception.
294
295 If the future is already done when this method is called, raises
296 InvalidStateError.
297 """
298 if self._state != _PENDING:
299 raise InvalidStateError('{}: {!r}'.format(self._state, self))
300 self._exception = exception
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700301 self._state = _FINISHED
302 self._schedule_callbacks()
Victor Stinner4c3c6992013-12-19 22:42:40 +0100303 if _PY34:
304 self._traceback = traceback.format_exception(
305 exception.__class__,
306 exception,
307 exception.__traceback__)
308 else:
309 self._tb_logger = _TracebackLogger(exception)
310 # Arrange for the logger to be activated after all callbacks
311 # have had a chance to call result() or exception().
312 self._loop.call_soon(self._tb_logger.activate)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700313
314 # Truly internal methods.
315
316 def _copy_state(self, other):
317 """Internal helper to copy state from another Future.
318
319 The other Future may be a concurrent.futures.Future.
320 """
321 assert other.done()
Guido van Rossum7a465642013-11-22 11:47:22 -0800322 if self.cancelled():
323 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700324 assert not self.done()
325 if other.cancelled():
326 self.cancel()
327 else:
328 exception = other.exception()
329 if exception is not None:
330 self.set_exception(exception)
331 else:
332 result = other.result()
333 self.set_result(result)
334
335 def __iter__(self):
336 if not self.done():
337 self._blocking = True
338 yield self # This tells Task to wait for completion.
339 assert self.done(), "yield from wasn't used with future"
340 return self.result() # May raise too.
341
342
343def wrap_future(fut, *, loop=None):
344 """Wrap concurrent.futures.Future object."""
345 if isinstance(fut, Future):
346 return fut
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700347 assert isinstance(fut, concurrent.futures.Future), \
348 'concurrent.futures.Future is expected, got {!r}'.format(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700349 if loop is None:
350 loop = events.get_event_loop()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700351 new_future = Future(loop=loop)
Guido van Rossum7a465642013-11-22 11:47:22 -0800352
353 def _check_cancel_other(f):
354 if f.cancelled():
355 fut.cancel()
356
357 new_future.add_done_callback(_check_cancel_other)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700358 fut.add_done_callback(
359 lambda future: loop.call_soon_threadsafe(
360 new_future._copy_state, fut))
361 return new_future