blob: b9cd45c786dbe230c8531c45ed87f272c54f8ced [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""A Future class similar to the one in PEP 3148."""
2
3__all__ = ['CancelledError', 'TimeoutError',
4 'InvalidStateError',
5 'Future', 'wrap_future',
6 ]
7
8import concurrent.futures._base
9import logging
Victor Stinner4c3c6992013-12-19 22:42:40 +010010import sys
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011import traceback
12
13from . import events
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070014from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015
16# States for Future.
17_PENDING = 'PENDING'
18_CANCELLED = 'CANCELLED'
19_FINISHED = 'FINISHED'
20
Victor Stinner4c3c6992013-12-19 22:42:40 +010021_PY34 = sys.version_info >= (3, 4)
22
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023# TODO: Do we really want to depend on concurrent.futures internals?
24Error = concurrent.futures._base.Error
25CancelledError = concurrent.futures.CancelledError
26TimeoutError = concurrent.futures.TimeoutError
27
28STACK_DEBUG = logging.DEBUG - 1 # heavy-duty debugging
29
30
31class InvalidStateError(Error):
32 """The operation is not allowed in this state."""
33 # TODO: Show the future, its state, the method, and the required state.
34
35
36class _TracebackLogger:
37 """Helper to log a traceback upon destruction if not cleared.
38
39 This solves a nasty problem with Futures and Tasks that have an
40 exception set: if nobody asks for the exception, the exception is
41 never logged. This violates the Zen of Python: 'Errors should
42 never pass silently. Unless explicitly silenced.'
43
44 However, we don't want to log the exception as soon as
45 set_exception() is called: if the calling code is written
46 properly, it will get the exception and handle it properly. But
47 we *do* want to log it if result() or exception() was never called
48 -- otherwise developers waste a lot of time wondering why their
49 buggy code fails silently.
50
51 An earlier attempt added a __del__() method to the Future class
52 itself, but this backfired because the presence of __del__()
53 prevents garbage collection from breaking cycles. A way out of
54 this catch-22 is to avoid having a __del__() method on the Future
55 class itself, but instead to have a reference to a helper object
56 with a __del__() method that logs the traceback, where we ensure
57 that the helper object doesn't participate in cycles, and only the
58 Future has a reference to it.
59
60 The helper object is added when set_exception() is called. When
61 the Future is collected, and the helper is present, the helper
62 object is also collected, and its __del__() method will log the
63 traceback. When the Future's result() or exception() method is
64 called (and a helper object is present), it removes the the helper
65 object, after calling its clear() method to prevent it from
66 logging.
67
68 One downside is that we do a fair amount of work to extract the
69 traceback from the exception, even when it is never logged. It
70 would seem cheaper to just store the exception object, but that
71 references the traceback, which references stack frames, which may
72 reference the Future, which references the _TracebackLogger, and
73 then the _TracebackLogger would be included in a cycle, which is
74 what we're trying to avoid! As an optimization, we don't
75 immediately format the exception; we only do the work when
76 activate() is called, which call is delayed until after all the
77 Future's callbacks have run. Since usually a Future has at least
78 one callback (typically set by 'yield from') and usually that
79 callback extracts the callback, thereby removing the need to
80 format the exception.
81
82 PS. I don't claim credit for this solution. I first heard of it
83 in a discussion about closing files when they are collected.
84 """
85
Yury Selivanov569efa22014-02-18 18:02:19 -050086 __slots__ = ['exc', 'tb', 'loop']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070087
Yury Selivanov569efa22014-02-18 18:02:19 -050088 def __init__(self, exc, loop):
89 self.loop = loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070090 self.exc = exc
91 self.tb = None
92
93 def activate(self):
94 exc = self.exc
95 if exc is not None:
96 self.exc = None
97 self.tb = traceback.format_exception(exc.__class__, exc,
98 exc.__traceback__)
99
100 def clear(self):
101 self.exc = None
102 self.tb = None
103
104 def __del__(self):
105 if self.tb:
Yury Selivanov569efa22014-02-18 18:02:19 -0500106 msg = 'Future/Task exception was never retrieved:\n{tb}'
107 context = {
108 'message': msg.format(tb=''.join(self.tb)),
109 }
110 self.loop.call_exception_handler(context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700111
112
113class Future:
114 """This class is *almost* compatible with concurrent.futures.Future.
115
116 Differences:
117
118 - result() and exception() do not take a timeout argument and
119 raise an exception when the future isn't done yet.
120
121 - Callbacks registered with add_done_callback() are always called
122 via the event loop's call_soon_threadsafe().
123
124 - This class is not compatible with the wait() and as_completed()
125 methods in the concurrent.futures package.
126
127 (In Python 3.4 or later we may be able to unify the implementations.)
128 """
129
130 # Class variables serving as defaults for instance variables.
131 _state = _PENDING
132 _result = None
133 _exception = None
134 _loop = None
135
136 _blocking = False # proper use of future (yield vs yield from)
137
Victor Stinnere40c0782013-12-21 00:19:33 +0100138 _log_traceback = False # Used for Python 3.4 and later
139 _tb_logger = None # Used for Python 3.3 only
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700140
141 def __init__(self, *, loop=None):
142 """Initialize the future.
143
144 The optional event_loop argument allows to explicitly set the event
145 loop object used by the future. If it's not provided, the future uses
146 the default event loop.
147 """
148 if loop is None:
149 self._loop = events.get_event_loop()
150 else:
151 self._loop = loop
152 self._callbacks = []
153
154 def __repr__(self):
155 res = self.__class__.__name__
156 if self._state == _FINISHED:
157 if self._exception is not None:
158 res += '<exception={!r}>'.format(self._exception)
159 else:
160 res += '<result={!r}>'.format(self._result)
161 elif self._callbacks:
162 size = len(self._callbacks)
163 if size > 2:
164 res += '<{}, [{}, <{} more>, {}]>'.format(
165 self._state, self._callbacks[0],
166 size-2, self._callbacks[-1])
167 else:
168 res += '<{}, {}>'.format(self._state, self._callbacks)
169 else:
170 res += '<{}>'.format(self._state)
171 return res
172
Victor Stinner4c3c6992013-12-19 22:42:40 +0100173 if _PY34:
174 def __del__(self):
Victor Stinnere40c0782013-12-21 00:19:33 +0100175 if not self._log_traceback:
176 # set_exception() was not called, or result() or exception()
177 # has consumed the exception
178 return
179 exc = self._exception
Yury Selivanov569efa22014-02-18 18:02:19 -0500180 context = {
181 'message': 'Future/Task exception was never retrieved',
182 'exception': exc,
183 'future': self,
184 }
185 self._loop.call_exception_handler(context)
Victor Stinner4c3c6992013-12-19 22:42:40 +0100186
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700187 def cancel(self):
188 """Cancel the future and schedule callbacks.
189
190 If the future is already done or cancelled, return False. Otherwise,
191 change the future's state to cancelled, schedule the callbacks and
192 return True.
193 """
194 if self._state != _PENDING:
195 return False
196 self._state = _CANCELLED
197 self._schedule_callbacks()
198 return True
199
200 def _schedule_callbacks(self):
201 """Internal: Ask the event loop to call all callbacks.
202
203 The callbacks are scheduled to be called as soon as possible. Also
204 clears the callback list.
205 """
206 callbacks = self._callbacks[:]
207 if not callbacks:
208 return
209
210 self._callbacks[:] = []
211 for callback in callbacks:
212 self._loop.call_soon(callback, self)
213
214 def cancelled(self):
215 """Return True if the future was cancelled."""
216 return self._state == _CANCELLED
217
218 # Don't implement running(); see http://bugs.python.org/issue18699
219
220 def done(self):
221 """Return True if the future is done.
222
223 Done means either that a result / exception are available, or that the
224 future was cancelled.
225 """
226 return self._state != _PENDING
227
228 def result(self):
229 """Return the result this future represents.
230
231 If the future has been cancelled, raises CancelledError. If the
232 future's result isn't yet available, raises InvalidStateError. If
233 the future is done and has an exception set, this exception is raised.
234 """
235 if self._state == _CANCELLED:
236 raise CancelledError
237 if self._state != _FINISHED:
238 raise InvalidStateError('Result is not ready.')
Victor Stinnere40c0782013-12-21 00:19:33 +0100239 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700240 if self._tb_logger is not None:
241 self._tb_logger.clear()
Victor Stinnere40c0782013-12-21 00:19:33 +0100242 self._tb_logger = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700243 if self._exception is not None:
244 raise self._exception
245 return self._result
246
247 def exception(self):
248 """Return the exception that was set on this future.
249
250 The exception (or None if no exception was set) is returned only if
251 the future is done. If the future has been cancelled, raises
252 CancelledError. If the future isn't done yet, raises
253 InvalidStateError.
254 """
255 if self._state == _CANCELLED:
256 raise CancelledError
257 if self._state != _FINISHED:
258 raise InvalidStateError('Exception is not set.')
Victor Stinnere40c0782013-12-21 00:19:33 +0100259 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700260 if self._tb_logger is not None:
261 self._tb_logger.clear()
Victor Stinnere40c0782013-12-21 00:19:33 +0100262 self._tb_logger = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700263 return self._exception
264
265 def add_done_callback(self, fn):
266 """Add a callback to be run when the future becomes done.
267
268 The callback is called with a single argument - the future object. If
269 the future is already done when this is called, the callback is
270 scheduled with call_soon.
271 """
272 if self._state != _PENDING:
273 self._loop.call_soon(fn, self)
274 else:
275 self._callbacks.append(fn)
276
277 # New method not in PEP 3148.
278
279 def remove_done_callback(self, fn):
280 """Remove all instances of a callback from the "call when done" list.
281
282 Returns the number of callbacks removed.
283 """
284 filtered_callbacks = [f for f in self._callbacks if f != fn]
285 removed_count = len(self._callbacks) - len(filtered_callbacks)
286 if removed_count:
287 self._callbacks[:] = filtered_callbacks
288 return removed_count
289
290 # So-called internal methods (note: no set_running_or_notify_cancel()).
291
292 def set_result(self, result):
293 """Mark the future done and set its result.
294
295 If the future is already done when this method is called, raises
296 InvalidStateError.
297 """
298 if self._state != _PENDING:
299 raise InvalidStateError('{}: {!r}'.format(self._state, self))
300 self._result = result
301 self._state = _FINISHED
302 self._schedule_callbacks()
303
304 def set_exception(self, exception):
305 """Mark the future done and set an exception.
306
307 If the future is already done when this method is called, raises
308 InvalidStateError.
309 """
310 if self._state != _PENDING:
311 raise InvalidStateError('{}: {!r}'.format(self._state, self))
Victor Stinner95728982014-01-30 16:01:54 -0800312 if isinstance(exception, type):
313 exception = exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700314 self._exception = exception
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700315 self._state = _FINISHED
316 self._schedule_callbacks()
Victor Stinner4c3c6992013-12-19 22:42:40 +0100317 if _PY34:
Victor Stinnere40c0782013-12-21 00:19:33 +0100318 self._log_traceback = True
Victor Stinner4c3c6992013-12-19 22:42:40 +0100319 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500320 self._tb_logger = _TracebackLogger(exception, self._loop)
Victor Stinner4c3c6992013-12-19 22:42:40 +0100321 # Arrange for the logger to be activated after all callbacks
322 # have had a chance to call result() or exception().
323 self._loop.call_soon(self._tb_logger.activate)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700324
325 # Truly internal methods.
326
327 def _copy_state(self, other):
328 """Internal helper to copy state from another Future.
329
330 The other Future may be a concurrent.futures.Future.
331 """
332 assert other.done()
Guido van Rossum7a465642013-11-22 11:47:22 -0800333 if self.cancelled():
334 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700335 assert not self.done()
336 if other.cancelled():
337 self.cancel()
338 else:
339 exception = other.exception()
340 if exception is not None:
341 self.set_exception(exception)
342 else:
343 result = other.result()
344 self.set_result(result)
345
346 def __iter__(self):
347 if not self.done():
348 self._blocking = True
349 yield self # This tells Task to wait for completion.
350 assert self.done(), "yield from wasn't used with future"
351 return self.result() # May raise too.
352
353
354def wrap_future(fut, *, loop=None):
355 """Wrap concurrent.futures.Future object."""
356 if isinstance(fut, Future):
357 return fut
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700358 assert isinstance(fut, concurrent.futures.Future), \
359 'concurrent.futures.Future is expected, got {!r}'.format(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700360 if loop is None:
361 loop = events.get_event_loop()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700362 new_future = Future(loop=loop)
Guido van Rossum7a465642013-11-22 11:47:22 -0800363
364 def _check_cancel_other(f):
365 if f.cancelled():
366 fut.cancel()
367
368 new_future.add_done_callback(_check_cancel_other)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700369 fut.add_done_callback(
370 lambda future: loop.call_soon_threadsafe(
371 new_future._copy_state, fut))
372 return new_future