blob: 40662a328ddf2b05f222e23b3d2813301e8e6b92 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""A Future class similar to the one in PEP 3148."""
2
3__all__ = ['CancelledError', 'TimeoutError',
4 'InvalidStateError',
5 'Future', 'wrap_future',
6 ]
7
8import concurrent.futures._base
9import logging
Victor Stinner313a9802014-07-29 12:58:23 +020010import reprlib
Victor Stinner4c3c6992013-12-19 22:42:40 +010011import sys
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012import traceback
13
14from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015
16# States for Future.
17_PENDING = 'PENDING'
18_CANCELLED = 'CANCELLED'
19_FINISHED = 'FINISHED'
20
Victor Stinner4c3c6992013-12-19 22:42:40 +010021_PY34 = sys.version_info >= (3, 4)
22
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023# TODO: Do we really want to depend on concurrent.futures internals?
24Error = concurrent.futures._base.Error
25CancelledError = concurrent.futures.CancelledError
26TimeoutError = concurrent.futures.TimeoutError
27
28STACK_DEBUG = logging.DEBUG - 1 # heavy-duty debugging
29
30
31class InvalidStateError(Error):
32 """The operation is not allowed in this state."""
33 # TODO: Show the future, its state, the method, and the required state.
34
35
36class _TracebackLogger:
37 """Helper to log a traceback upon destruction if not cleared.
38
39 This solves a nasty problem with Futures and Tasks that have an
40 exception set: if nobody asks for the exception, the exception is
41 never logged. This violates the Zen of Python: 'Errors should
42 never pass silently. Unless explicitly silenced.'
43
44 However, we don't want to log the exception as soon as
45 set_exception() is called: if the calling code is written
46 properly, it will get the exception and handle it properly. But
47 we *do* want to log it if result() or exception() was never called
48 -- otherwise developers waste a lot of time wondering why their
49 buggy code fails silently.
50
51 An earlier attempt added a __del__() method to the Future class
52 itself, but this backfired because the presence of __del__()
53 prevents garbage collection from breaking cycles. A way out of
54 this catch-22 is to avoid having a __del__() method on the Future
55 class itself, but instead to have a reference to a helper object
56 with a __del__() method that logs the traceback, where we ensure
57 that the helper object doesn't participate in cycles, and only the
58 Future has a reference to it.
59
60 The helper object is added when set_exception() is called. When
61 the Future is collected, and the helper is present, the helper
62 object is also collected, and its __del__() method will log the
63 traceback. When the Future's result() or exception() method is
64 called (and a helper object is present), it removes the the helper
65 object, after calling its clear() method to prevent it from
66 logging.
67
68 One downside is that we do a fair amount of work to extract the
69 traceback from the exception, even when it is never logged. It
70 would seem cheaper to just store the exception object, but that
71 references the traceback, which references stack frames, which may
72 reference the Future, which references the _TracebackLogger, and
73 then the _TracebackLogger would be included in a cycle, which is
74 what we're trying to avoid! As an optimization, we don't
75 immediately format the exception; we only do the work when
76 activate() is called, which call is delayed until after all the
77 Future's callbacks have run. Since usually a Future has at least
78 one callback (typically set by 'yield from') and usually that
79 callback extracts the callback, thereby removing the need to
80 format the exception.
81
82 PS. I don't claim credit for this solution. I first heard of it
83 in a discussion about closing files when they are collected.
84 """
85
Victor Stinner80f53aa2014-06-27 13:52:20 +020086 __slots__ = ('loop', 'source_traceback', 'exc', 'tb')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070087
Victor Stinner80f53aa2014-06-27 13:52:20 +020088 def __init__(self, future, exc):
89 self.loop = future._loop
90 self.source_traceback = future._source_traceback
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070091 self.exc = exc
92 self.tb = None
93
94 def activate(self):
95 exc = self.exc
96 if exc is not None:
97 self.exc = None
98 self.tb = traceback.format_exception(exc.__class__, exc,
99 exc.__traceback__)
100
101 def clear(self):
102 self.exc = None
103 self.tb = None
104
105 def __del__(self):
106 if self.tb:
Victor Stinner662fd5f2014-11-20 14:16:31 +0100107 msg = 'Future/Task exception was never retrieved\n'
Victor Stinner80f53aa2014-06-27 13:52:20 +0200108 if self.source_traceback:
Victor Stinner662fd5f2014-11-20 14:16:31 +0100109 src = ''.join(traceback.format_list(self.source_traceback))
110 msg += 'Future/Task created at (most recent call last):\n'
111 msg += '%s\n' % src.rstrip()
Victor Stinner80f53aa2014-06-27 13:52:20 +0200112 msg += ''.join(self.tb).rstrip()
113 self.loop.call_exception_handler({'message': msg})
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700114
115
116class Future:
117 """This class is *almost* compatible with concurrent.futures.Future.
118
119 Differences:
120
121 - result() and exception() do not take a timeout argument and
122 raise an exception when the future isn't done yet.
123
124 - Callbacks registered with add_done_callback() are always called
125 via the event loop's call_soon_threadsafe().
126
127 - This class is not compatible with the wait() and as_completed()
128 methods in the concurrent.futures package.
129
130 (In Python 3.4 or later we may be able to unify the implementations.)
131 """
132
133 # Class variables serving as defaults for instance variables.
134 _state = _PENDING
135 _result = None
136 _exception = None
137 _loop = None
138
139 _blocking = False # proper use of future (yield vs yield from)
140
Victor Stinnere40c0782013-12-21 00:19:33 +0100141 _log_traceback = False # Used for Python 3.4 and later
142 _tb_logger = None # Used for Python 3.3 only
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700143
144 def __init__(self, *, loop=None):
145 """Initialize the future.
146
147 The optional event_loop argument allows to explicitly set the event
148 loop object used by the future. If it's not provided, the future uses
149 the default event loop.
150 """
151 if loop is None:
152 self._loop = events.get_event_loop()
153 else:
154 self._loop = loop
155 self._callbacks = []
Victor Stinner80f53aa2014-06-27 13:52:20 +0200156 if self._loop.get_debug():
157 self._source_traceback = traceback.extract_stack(sys._getframe(1))
158 else:
159 self._source_traceback = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700160
Victor Stinner975735f2014-06-25 21:41:58 +0200161 def _format_callbacks(self):
162 cb = self._callbacks
163 size = len(cb)
164 if not size:
165 cb = ''
166
167 def format_cb(callback):
168 return events._format_callback(callback, ())
169
170 if size == 1:
171 cb = format_cb(cb[0])
172 elif size == 2:
173 cb = '{}, {}'.format(format_cb(cb[0]), format_cb(cb[1]))
174 elif size > 2:
175 cb = '{}, <{} more>, {}'.format(format_cb(cb[0]),
176 size-2,
177 format_cb(cb[-1]))
178 return 'cb=[%s]' % cb
179
Victor Stinner313a9802014-07-29 12:58:23 +0200180 def _repr_info(self):
Victor Stinner975735f2014-06-25 21:41:58 +0200181 info = [self._state.lower()]
182 if self._state == _FINISHED:
Victor Stinner313a9802014-07-29 12:58:23 +0200183 if self._exception is not None:
184 info.append('exception={!r}'.format(self._exception))
185 else:
186 # use reprlib to limit the length of the output, especially
187 # for very long strings
188 result = reprlib.repr(self._result)
189 info.append('result={}'.format(result))
Victor Stinner975735f2014-06-25 21:41:58 +0200190 if self._callbacks:
191 info.append(self._format_callbacks())
Victor Stinner313a9802014-07-29 12:58:23 +0200192 if self._source_traceback:
193 frame = self._source_traceback[-1]
194 info.append('created at %s:%s' % (frame[0], frame[1]))
195 return info
196
197 def __repr__(self):
198 info = self._repr_info()
Victor Stinner975735f2014-06-25 21:41:58 +0200199 return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700200
Victor Stinnera02f81f2014-06-24 22:37:53 +0200201 # On Python 3.3 or older, objects with a destructor part of a reference
202 # cycle are never destroyed. It's not more the case on Python 3.4 thanks to
203 # the PEP 442.
Victor Stinner4c3c6992013-12-19 22:42:40 +0100204 if _PY34:
205 def __del__(self):
Victor Stinnere40c0782013-12-21 00:19:33 +0100206 if not self._log_traceback:
207 # set_exception() was not called, or result() or exception()
208 # has consumed the exception
209 return
210 exc = self._exception
Yury Selivanov569efa22014-02-18 18:02:19 -0500211 context = {
Victor Stinner80f53aa2014-06-27 13:52:20 +0200212 'message': ('%s exception was never retrieved'
213 % self.__class__.__name__),
Yury Selivanov569efa22014-02-18 18:02:19 -0500214 'exception': exc,
215 'future': self,
216 }
Victor Stinner80f53aa2014-06-27 13:52:20 +0200217 if self._source_traceback:
218 context['source_traceback'] = self._source_traceback
Yury Selivanov569efa22014-02-18 18:02:19 -0500219 self._loop.call_exception_handler(context)
Victor Stinner4c3c6992013-12-19 22:42:40 +0100220
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700221 def cancel(self):
222 """Cancel the future and schedule callbacks.
223
224 If the future is already done or cancelled, return False. Otherwise,
225 change the future's state to cancelled, schedule the callbacks and
226 return True.
227 """
228 if self._state != _PENDING:
229 return False
230 self._state = _CANCELLED
231 self._schedule_callbacks()
232 return True
233
234 def _schedule_callbacks(self):
235 """Internal: Ask the event loop to call all callbacks.
236
237 The callbacks are scheduled to be called as soon as possible. Also
238 clears the callback list.
239 """
240 callbacks = self._callbacks[:]
241 if not callbacks:
242 return
243
244 self._callbacks[:] = []
245 for callback in callbacks:
246 self._loop.call_soon(callback, self)
247
248 def cancelled(self):
249 """Return True if the future was cancelled."""
250 return self._state == _CANCELLED
251
252 # Don't implement running(); see http://bugs.python.org/issue18699
253
254 def done(self):
255 """Return True if the future is done.
256
257 Done means either that a result / exception are available, or that the
258 future was cancelled.
259 """
260 return self._state != _PENDING
261
262 def result(self):
263 """Return the result this future represents.
264
265 If the future has been cancelled, raises CancelledError. If the
266 future's result isn't yet available, raises InvalidStateError. If
267 the future is done and has an exception set, this exception is raised.
268 """
269 if self._state == _CANCELLED:
270 raise CancelledError
271 if self._state != _FINISHED:
272 raise InvalidStateError('Result is not ready.')
Victor Stinnere40c0782013-12-21 00:19:33 +0100273 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700274 if self._tb_logger is not None:
275 self._tb_logger.clear()
Victor Stinnere40c0782013-12-21 00:19:33 +0100276 self._tb_logger = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700277 if self._exception is not None:
278 raise self._exception
279 return self._result
280
281 def exception(self):
282 """Return the exception that was set on this future.
283
284 The exception (or None if no exception was set) is returned only if
285 the future is done. If the future has been cancelled, raises
286 CancelledError. If the future isn't done yet, raises
287 InvalidStateError.
288 """
289 if self._state == _CANCELLED:
290 raise CancelledError
291 if self._state != _FINISHED:
292 raise InvalidStateError('Exception is not set.')
Victor Stinnere40c0782013-12-21 00:19:33 +0100293 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700294 if self._tb_logger is not None:
295 self._tb_logger.clear()
Victor Stinnere40c0782013-12-21 00:19:33 +0100296 self._tb_logger = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700297 return self._exception
298
299 def add_done_callback(self, fn):
300 """Add a callback to be run when the future becomes done.
301
302 The callback is called with a single argument - the future object. If
303 the future is already done when this is called, the callback is
304 scheduled with call_soon.
305 """
306 if self._state != _PENDING:
307 self._loop.call_soon(fn, self)
308 else:
309 self._callbacks.append(fn)
310
311 # New method not in PEP 3148.
312
313 def remove_done_callback(self, fn):
314 """Remove all instances of a callback from the "call when done" list.
315
316 Returns the number of callbacks removed.
317 """
318 filtered_callbacks = [f for f in self._callbacks if f != fn]
319 removed_count = len(self._callbacks) - len(filtered_callbacks)
320 if removed_count:
321 self._callbacks[:] = filtered_callbacks
322 return removed_count
323
324 # So-called internal methods (note: no set_running_or_notify_cancel()).
325
Victor Stinnera9acbe82014-07-05 15:29:41 +0200326 def _set_result_unless_cancelled(self, result):
327 """Helper setting the result only if the future was not cancelled."""
328 if self.cancelled():
329 return
330 self.set_result(result)
331
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700332 def set_result(self, result):
333 """Mark the future done and set its result.
334
335 If the future is already done when this method is called, raises
336 InvalidStateError.
337 """
338 if self._state != _PENDING:
339 raise InvalidStateError('{}: {!r}'.format(self._state, self))
340 self._result = result
341 self._state = _FINISHED
342 self._schedule_callbacks()
343
344 def set_exception(self, exception):
345 """Mark the future done and set an exception.
346
347 If the future is already done when this method is called, raises
348 InvalidStateError.
349 """
350 if self._state != _PENDING:
351 raise InvalidStateError('{}: {!r}'.format(self._state, self))
Victor Stinner95728982014-01-30 16:01:54 -0800352 if isinstance(exception, type):
353 exception = exception()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700354 self._exception = exception
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700355 self._state = _FINISHED
356 self._schedule_callbacks()
Victor Stinner4c3c6992013-12-19 22:42:40 +0100357 if _PY34:
Victor Stinnere40c0782013-12-21 00:19:33 +0100358 self._log_traceback = True
Victor Stinner4c3c6992013-12-19 22:42:40 +0100359 else:
Victor Stinner80f53aa2014-06-27 13:52:20 +0200360 self._tb_logger = _TracebackLogger(self, exception)
Victor Stinner4c3c6992013-12-19 22:42:40 +0100361 # Arrange for the logger to be activated after all callbacks
362 # have had a chance to call result() or exception().
363 self._loop.call_soon(self._tb_logger.activate)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700364
365 # Truly internal methods.
366
367 def _copy_state(self, other):
368 """Internal helper to copy state from another Future.
369
370 The other Future may be a concurrent.futures.Future.
371 """
372 assert other.done()
Guido van Rossum7a465642013-11-22 11:47:22 -0800373 if self.cancelled():
374 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700375 assert not self.done()
376 if other.cancelled():
377 self.cancel()
378 else:
379 exception = other.exception()
380 if exception is not None:
381 self.set_exception(exception)
382 else:
383 result = other.result()
384 self.set_result(result)
385
386 def __iter__(self):
387 if not self.done():
388 self._blocking = True
389 yield self # This tells Task to wait for completion.
390 assert self.done(), "yield from wasn't used with future"
391 return self.result() # May raise too.
392
393
394def wrap_future(fut, *, loop=None):
395 """Wrap concurrent.futures.Future object."""
396 if isinstance(fut, Future):
397 return fut
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700398 assert isinstance(fut, concurrent.futures.Future), \
399 'concurrent.futures.Future is expected, got {!r}'.format(fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700400 if loop is None:
401 loop = events.get_event_loop()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700402 new_future = Future(loop=loop)
Guido van Rossum7a465642013-11-22 11:47:22 -0800403
404 def _check_cancel_other(f):
405 if f.cancelled():
406 fut.cancel()
407
408 new_future.add_done_callback(_check_cancel_other)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700409 fut.add_done_callback(
410 lambda future: loop.call_soon_threadsafe(
411 new_future._copy_state, fut))
412 return new_future