blob: db278386ea06e2ed74c1f8df2254dae9a3fff2b5 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""A Future class similar to the one in PEP 3148."""
2
3__all__ = ['CancelledError', 'TimeoutError',
4 'InvalidStateError',
5 'Future', 'wrap_future',
6 ]
7
8import concurrent.futures._base
9import logging
10import traceback
11
12from . import events
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070013from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014
15# States for Future.
16_PENDING = 'PENDING'
17_CANCELLED = 'CANCELLED'
18_FINISHED = 'FINISHED'
19
20# TODO: Do we really want to depend on concurrent.futures internals?
21Error = concurrent.futures._base.Error
22CancelledError = concurrent.futures.CancelledError
23TimeoutError = concurrent.futures.TimeoutError
24
25STACK_DEBUG = logging.DEBUG - 1 # heavy-duty debugging
26
27
28class InvalidStateError(Error):
29 """The operation is not allowed in this state."""
30 # TODO: Show the future, its state, the method, and the required state.
31
32
33class _TracebackLogger:
34 """Helper to log a traceback upon destruction if not cleared.
35
36 This solves a nasty problem with Futures and Tasks that have an
37 exception set: if nobody asks for the exception, the exception is
38 never logged. This violates the Zen of Python: 'Errors should
39 never pass silently. Unless explicitly silenced.'
40
41 However, we don't want to log the exception as soon as
42 set_exception() is called: if the calling code is written
43 properly, it will get the exception and handle it properly. But
44 we *do* want to log it if result() or exception() was never called
45 -- otherwise developers waste a lot of time wondering why their
46 buggy code fails silently.
47
48 An earlier attempt added a __del__() method to the Future class
49 itself, but this backfired because the presence of __del__()
50 prevents garbage collection from breaking cycles. A way out of
51 this catch-22 is to avoid having a __del__() method on the Future
52 class itself, but instead to have a reference to a helper object
53 with a __del__() method that logs the traceback, where we ensure
54 that the helper object doesn't participate in cycles, and only the
55 Future has a reference to it.
56
57 The helper object is added when set_exception() is called. When
58 the Future is collected, and the helper is present, the helper
59 object is also collected, and its __del__() method will log the
60 traceback. When the Future's result() or exception() method is
61 called (and a helper object is present), it removes the the helper
62 object, after calling its clear() method to prevent it from
63 logging.
64
65 One downside is that we do a fair amount of work to extract the
66 traceback from the exception, even when it is never logged. It
67 would seem cheaper to just store the exception object, but that
68 references the traceback, which references stack frames, which may
69 reference the Future, which references the _TracebackLogger, and
70 then the _TracebackLogger would be included in a cycle, which is
71 what we're trying to avoid! As an optimization, we don't
72 immediately format the exception; we only do the work when
73 activate() is called, which call is delayed until after all the
74 Future's callbacks have run. Since usually a Future has at least
75 one callback (typically set by 'yield from') and usually that
76 callback extracts the callback, thereby removing the need to
77 format the exception.
78
79 PS. I don't claim credit for this solution. I first heard of it
80 in a discussion about closing files when they are collected.
81 """
82
83 __slots__ = ['exc', 'tb']
84
85 def __init__(self, exc):
86 self.exc = exc
87 self.tb = None
88
89 def activate(self):
90 exc = self.exc
91 if exc is not None:
92 self.exc = None
93 self.tb = traceback.format_exception(exc.__class__, exc,
94 exc.__traceback__)
95
96 def clear(self):
97 self.exc = None
98 self.tb = None
99
100 def __del__(self):
101 if self.tb:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700102 logger.error('Future/Task exception was never retrieved:\n%s',
103 ''.join(self.tb))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700104
105
106class Future:
107 """This class is *almost* compatible with concurrent.futures.Future.
108
109 Differences:
110
111 - result() and exception() do not take a timeout argument and
112 raise an exception when the future isn't done yet.
113
114 - Callbacks registered with add_done_callback() are always called
115 via the event loop's call_soon_threadsafe().
116
117 - This class is not compatible with the wait() and as_completed()
118 methods in the concurrent.futures package.
119
120 (In Python 3.4 or later we may be able to unify the implementations.)
121 """
122
123 # Class variables serving as defaults for instance variables.
124 _state = _PENDING
125 _result = None
126 _exception = None
127 _loop = None
128
129 _blocking = False # proper use of future (yield vs yield from)
130
131 _tb_logger = None
132
133 def __init__(self, *, loop=None):
134 """Initialize the future.
135
136 The optional event_loop argument allows to explicitly set the event
137 loop object used by the future. If it's not provided, the future uses
138 the default event loop.
139 """
140 if loop is None:
141 self._loop = events.get_event_loop()
142 else:
143 self._loop = loop
144 self._callbacks = []
145
146 def __repr__(self):
147 res = self.__class__.__name__
148 if self._state == _FINISHED:
149 if self._exception is not None:
150 res += '<exception={!r}>'.format(self._exception)
151 else:
152 res += '<result={!r}>'.format(self._result)
153 elif self._callbacks:
154 size = len(self._callbacks)
155 if size > 2:
156 res += '<{}, [{}, <{} more>, {}]>'.format(
157 self._state, self._callbacks[0],
158 size-2, self._callbacks[-1])
159 else:
160 res += '<{}, {}>'.format(self._state, self._callbacks)
161 else:
162 res += '<{}>'.format(self._state)
163 return res
164
165 def cancel(self):
166 """Cancel the future and schedule callbacks.
167
168 If the future is already done or cancelled, return False. Otherwise,
169 change the future's state to cancelled, schedule the callbacks and
170 return True.
171 """
172 if self._state != _PENDING:
173 return False
174 self._state = _CANCELLED
175 self._schedule_callbacks()
176 return True
177
178 def _schedule_callbacks(self):
179 """Internal: Ask the event loop to call all callbacks.
180
181 The callbacks are scheduled to be called as soon as possible. Also
182 clears the callback list.
183 """
184 callbacks = self._callbacks[:]
185 if not callbacks:
186 return
187
188 self._callbacks[:] = []
189 for callback in callbacks:
190 self._loop.call_soon(callback, self)
191
192 def cancelled(self):
193 """Return True if the future was cancelled."""
194 return self._state == _CANCELLED
195
196 # Don't implement running(); see http://bugs.python.org/issue18699
197
198 def done(self):
199 """Return True if the future is done.
200
201 Done means either that a result / exception are available, or that the
202 future was cancelled.
203 """
204 return self._state != _PENDING
205
206 def result(self):
207 """Return the result this future represents.
208
209 If the future has been cancelled, raises CancelledError. If the
210 future's result isn't yet available, raises InvalidStateError. If
211 the future is done and has an exception set, this exception is raised.
212 """
213 if self._state == _CANCELLED:
214 raise CancelledError
215 if self._state != _FINISHED:
216 raise InvalidStateError('Result is not ready.')
217 if self._tb_logger is not None:
218 self._tb_logger.clear()
219 self._tb_logger = None
220 if self._exception is not None:
221 raise self._exception
222 return self._result
223
224 def exception(self):
225 """Return the exception that was set on this future.
226
227 The exception (or None if no exception was set) is returned only if
228 the future is done. If the future has been cancelled, raises
229 CancelledError. If the future isn't done yet, raises
230 InvalidStateError.
231 """
232 if self._state == _CANCELLED:
233 raise CancelledError
234 if self._state != _FINISHED:
235 raise InvalidStateError('Exception is not set.')
236 if self._tb_logger is not None:
237 self._tb_logger.clear()
238 self._tb_logger = None
239 return self._exception
240
241 def add_done_callback(self, fn):
242 """Add a callback to be run when the future becomes done.
243
244 The callback is called with a single argument - the future object. If
245 the future is already done when this is called, the callback is
246 scheduled with call_soon.
247 """
248 if self._state != _PENDING:
249 self._loop.call_soon(fn, self)
250 else:
251 self._callbacks.append(fn)
252
253 # New method not in PEP 3148.
254
255 def remove_done_callback(self, fn):
256 """Remove all instances of a callback from the "call when done" list.
257
258 Returns the number of callbacks removed.
259 """
260 filtered_callbacks = [f for f in self._callbacks if f != fn]
261 removed_count = len(self._callbacks) - len(filtered_callbacks)
262 if removed_count:
263 self._callbacks[:] = filtered_callbacks
264 return removed_count
265
266 # So-called internal methods (note: no set_running_or_notify_cancel()).
267
268 def set_result(self, result):
269 """Mark the future done and set its result.
270
271 If the future is already done when this method is called, raises
272 InvalidStateError.
273 """
274 if self._state != _PENDING:
275 raise InvalidStateError('{}: {!r}'.format(self._state, self))
276 self._result = result
277 self._state = _FINISHED
278 self._schedule_callbacks()
279
280 def set_exception(self, exception):
281 """Mark the future done and set an exception.
282
283 If the future is already done when this method is called, raises
284 InvalidStateError.
285 """
286 if self._state != _PENDING:
287 raise InvalidStateError('{}: {!r}'.format(self._state, self))
288 self._exception = exception
289 self._tb_logger = _TracebackLogger(exception)
290 self._state = _FINISHED
291 self._schedule_callbacks()
292 # Arrange for the logger to be activated after all callbacks
293 # have had a chance to call result() or exception().
294 self._loop.call_soon(self._tb_logger.activate)
295
296 # Truly internal methods.
297
298 def _copy_state(self, other):
299 """Internal helper to copy state from another Future.
300
301 The other Future may be a concurrent.futures.Future.
302 """
303 assert other.done()
304 assert not self.done()
305 if other.cancelled():
306 self.cancel()
307 else:
308 exception = other.exception()
309 if exception is not None:
310 self.set_exception(exception)
311 else:
312 result = other.result()
313 self.set_result(result)
314
315 def __iter__(self):
316 if not self.done():
317 self._blocking = True
318 yield self # This tells Task to wait for completion.
319 assert self.done(), "yield from wasn't used with future"
320 return self.result() # May raise too.
321
322
323def wrap_future(fut, *, loop=None):
324 """Wrap concurrent.futures.Future object."""
325 if isinstance(fut, Future):
326 return fut
327
328 assert isinstance(fut, concurrent.futures.Future), \
329 'concurrent.futures.Future is expected, got {!r}'.format(fut)
330
331 if loop is None:
332 loop = events.get_event_loop()
333
334 new_future = Future(loop=loop)
335 fut.add_done_callback(
336 lambda future: loop.call_soon_threadsafe(
337 new_future._copy_state, fut))
338 return new_future