blob: ddb9cde188395ceb22537287c85998342f18c95c [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""A Future class similar to the one in PEP 3148."""
2
3__all__ = ['CancelledError', 'TimeoutError',
4 'InvalidStateError',
5 'Future', 'wrap_future',
6 ]
7
8import concurrent.futures._base
9import logging
Victor Stinner313a9802014-07-29 12:58:23 +020010import reprlib
Victor Stinner4c3c6992013-12-19 22:42:40 +010011import sys
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012import traceback
13
Victor Stinner71080fc2015-07-25 02:23:21 +020014from . import compat
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016
17# States for Future.
18_PENDING = 'PENDING'
19_CANCELLED = 'CANCELLED'
20_FINISHED = 'FINISHED'
21
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022Error = concurrent.futures._base.Error
23CancelledError = concurrent.futures.CancelledError
24TimeoutError = concurrent.futures.TimeoutError
25
26STACK_DEBUG = logging.DEBUG - 1 # heavy-duty debugging
27
28
29class InvalidStateError(Error):
30 """The operation is not allowed in this state."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031
32
33class _TracebackLogger:
34 """Helper to log a traceback upon destruction if not cleared.
35
36 This solves a nasty problem with Futures and Tasks that have an
37 exception set: if nobody asks for the exception, the exception is
38 never logged. This violates the Zen of Python: 'Errors should
39 never pass silently. Unless explicitly silenced.'
40
41 However, we don't want to log the exception as soon as
42 set_exception() is called: if the calling code is written
43 properly, it will get the exception and handle it properly. But
44 we *do* want to log it if result() or exception() was never called
45 -- otherwise developers waste a lot of time wondering why their
46 buggy code fails silently.
47
48 An earlier attempt added a __del__() method to the Future class
49 itself, but this backfired because the presence of __del__()
50 prevents garbage collection from breaking cycles. A way out of
51 this catch-22 is to avoid having a __del__() method on the Future
52 class itself, but instead to have a reference to a helper object
53 with a __del__() method that logs the traceback, where we ensure
54 that the helper object doesn't participate in cycles, and only the
55 Future has a reference to it.
56
57 The helper object is added when set_exception() is called. When
58 the Future is collected, and the helper is present, the helper
59 object is also collected, and its __del__() method will log the
60 traceback. When the Future's result() or exception() method is
Serhiy Storchaka56a6d852014-12-01 18:28:43 +020061 called (and a helper object is present), it removes the helper
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070062 object, after calling its clear() method to prevent it from
63 logging.
64
65 One downside is that we do a fair amount of work to extract the
66 traceback from the exception, even when it is never logged. It
67 would seem cheaper to just store the exception object, but that
68 references the traceback, which references stack frames, which may
69 reference the Future, which references the _TracebackLogger, and
70 then the _TracebackLogger would be included in a cycle, which is
71 what we're trying to avoid! As an optimization, we don't
72 immediately format the exception; we only do the work when
73 activate() is called, which call is delayed until after all the
74 Future's callbacks have run. Since usually a Future has at least
75 one callback (typically set by 'yield from') and usually that
76 callback extracts the callback, thereby removing the need to
77 format the exception.
78
79 PS. I don't claim credit for this solution. I first heard of it
80 in a discussion about closing files when they are collected.
81 """
82
Victor Stinner80f53aa2014-06-27 13:52:20 +020083 __slots__ = ('loop', 'source_traceback', 'exc', 'tb')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070084
Victor Stinner80f53aa2014-06-27 13:52:20 +020085 def __init__(self, future, exc):
86 self.loop = future._loop
87 self.source_traceback = future._source_traceback
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070088 self.exc = exc
89 self.tb = None
90
91 def activate(self):
92 exc = self.exc
93 if exc is not None:
94 self.exc = None
95 self.tb = traceback.format_exception(exc.__class__, exc,
96 exc.__traceback__)
97
98 def clear(self):
99 self.exc = None
100 self.tb = None
101
102 def __del__(self):
103 if self.tb:
Victor Stinner662fd5f2014-11-20 14:16:31 +0100104 msg = 'Future/Task exception was never retrieved\n'
Victor Stinner80f53aa2014-06-27 13:52:20 +0200105 if self.source_traceback:
Victor Stinner662fd5f2014-11-20 14:16:31 +0100106 src = ''.join(traceback.format_list(self.source_traceback))
107 msg += 'Future/Task created at (most recent call last):\n'
108 msg += '%s\n' % src.rstrip()
Victor Stinner80f53aa2014-06-27 13:52:20 +0200109 msg += ''.join(self.tb).rstrip()
110 self.loop.call_exception_handler({'message': msg})
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700111
112
113class Future:
114 """This class is *almost* compatible with concurrent.futures.Future.
115
116 Differences:
117
118 - result() and exception() do not take a timeout argument and
119 raise an exception when the future isn't done yet.
120
121 - Callbacks registered with add_done_callback() are always called
122 via the event loop's call_soon_threadsafe().
123
124 - This class is not compatible with the wait() and as_completed()
125 methods in the concurrent.futures package.
126
127 (In Python 3.4 or later we may be able to unify the implementations.)
128 """
129
130 # Class variables serving as defaults for instance variables.
131 _state = _PENDING
132 _result = None
133 _exception = None
134 _loop = None
Victor Stinnerfe22e092014-12-04 23:00:13 +0100135 _source_traceback = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700136
137 _blocking = False # proper use of future (yield vs yield from)
138
Victor Stinnere40c0782013-12-21 00:19:33 +0100139 _log_traceback = False # Used for Python 3.4 and later
140 _tb_logger = None # Used for Python 3.3 only
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700141
142 def __init__(self, *, loop=None):
143 """Initialize the future.
144
Martin Panterc04fb562016-02-10 05:44:01 +0000145 The optional event_loop argument allows explicitly setting the event
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700146 loop object used by the future. If it's not provided, the future uses
147 the default event loop.
148 """
149 if loop is None:
150 self._loop = events.get_event_loop()
151 else:
152 self._loop = loop
153 self._callbacks = []
Victor Stinner80f53aa2014-06-27 13:52:20 +0200154 if self._loop.get_debug():
155 self._source_traceback = traceback.extract_stack(sys._getframe(1))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700156
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500157 def __format_callbacks(self):
Victor Stinner975735f2014-06-25 21:41:58 +0200158 cb = self._callbacks
159 size = len(cb)
160 if not size:
161 cb = ''
162
163 def format_cb(callback):
Guido van Rossum0a9933e2015-05-02 18:38:24 -0700164 return events._format_callback_source(callback, ())
Victor Stinner975735f2014-06-25 21:41:58 +0200165
166 if size == 1:
167 cb = format_cb(cb[0])
168 elif size == 2:
169 cb = '{}, {}'.format(format_cb(cb[0]), format_cb(cb[1]))
170 elif size > 2:
171 cb = '{}, <{} more>, {}'.format(format_cb(cb[0]),
172 size-2,
173 format_cb(cb[-1]))
174 return 'cb=[%s]' % cb
175
Victor Stinner313a9802014-07-29 12:58:23 +0200176 def _repr_info(self):
Victor Stinner975735f2014-06-25 21:41:58 +0200177 info = [self._state.lower()]
178 if self._state == _FINISHED:
Victor Stinner313a9802014-07-29 12:58:23 +0200179 if self._exception is not None:
180 info.append('exception={!r}'.format(self._exception))
181 else:
182 # use reprlib to limit the length of the output, especially
183 # for very long strings
184 result = reprlib.repr(self._result)
185 info.append('result={}'.format(result))
Victor Stinner975735f2014-06-25 21:41:58 +0200186 if self._callbacks:
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500187 info.append(self.__format_callbacks())
Victor Stinner313a9802014-07-29 12:58:23 +0200188 if self._source_traceback:
189 frame = self._source_traceback[-1]
190 info.append('created at %s:%s' % (frame[0], frame[1]))
191 return info
192
193 def __repr__(self):
194 info = self._repr_info()
Victor Stinner975735f2014-06-25 21:41:58 +0200195 return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700196
Victor Stinner978a9af2015-01-29 17:50:58 +0100197 # On Python 3.3 and older, objects with a destructor part of a reference
198 # cycle are never destroyed. It's not more the case on Python 3.4 thanks
199 # to the PEP 442.
Victor Stinner71080fc2015-07-25 02:23:21 +0200200 if compat.PY34:
Victor Stinner4c3c6992013-12-19 22:42:40 +0100201 def __del__(self):
Victor Stinnere40c0782013-12-21 00:19:33 +0100202 if not self._log_traceback:
203 # set_exception() was not called, or result() or exception()
204 # has consumed the exception
205 return
206 exc = self._exception
Yury Selivanov569efa22014-02-18 18:02:19 -0500207 context = {
Victor Stinner80f53aa2014-06-27 13:52:20 +0200208 'message': ('%s exception was never retrieved'
209 % self.__class__.__name__),
Yury Selivanov569efa22014-02-18 18:02:19 -0500210 'exception': exc,
211 'future': self,
212 }
Victor Stinner80f53aa2014-06-27 13:52:20 +0200213 if self._source_traceback:
214 context['source_traceback'] = self._source_traceback
Yury Selivanov569efa22014-02-18 18:02:19 -0500215 self._loop.call_exception_handler(context)
Victor Stinner4c3c6992013-12-19 22:42:40 +0100216
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700217 def cancel(self):
218 """Cancel the future and schedule callbacks.
219
220 If the future is already done or cancelled, return False. Otherwise,
221 change the future's state to cancelled, schedule the callbacks and
222 return True.
223 """
224 if self._state != _PENDING:
225 return False
226 self._state = _CANCELLED
227 self._schedule_callbacks()
228 return True
229
230 def _schedule_callbacks(self):
231 """Internal: Ask the event loop to call all callbacks.
232
233 The callbacks are scheduled to be called as soon as possible. Also
234 clears the callback list.
235 """
236 callbacks = self._callbacks[:]
237 if not callbacks:
238 return
239
240 self._callbacks[:] = []
241 for callback in callbacks:
242 self._loop.call_soon(callback, self)
243
244 def cancelled(self):
245 """Return True if the future was cancelled."""
246 return self._state == _CANCELLED
247
248 # Don't implement running(); see http://bugs.python.org/issue18699
249
250 def done(self):
251 """Return True if the future is done.
252
253 Done means either that a result / exception are available, or that the
254 future was cancelled.
255 """
256 return self._state != _PENDING
257
258 def result(self):
259 """Return the result this future represents.
260
261 If the future has been cancelled, raises CancelledError. If the
262 future's result isn't yet available, raises InvalidStateError. If
263 the future is done and has an exception set, this exception is raised.
264 """
265 if self._state == _CANCELLED:
266 raise CancelledError
267 if self._state != _FINISHED:
268 raise InvalidStateError('Result is not ready.')
Victor Stinnere40c0782013-12-21 00:19:33 +0100269 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700270 if self._tb_logger is not None:
271 self._tb_logger.clear()
Victor Stinnere40c0782013-12-21 00:19:33 +0100272 self._tb_logger = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700273 if self._exception is not None:
274 raise self._exception
275 return self._result
276
277 def exception(self):
278 """Return the exception that was set on this future.
279
280 The exception (or None if no exception was set) is returned only if
281 the future is done. If the future has been cancelled, raises
282 CancelledError. If the future isn't done yet, raises
283 InvalidStateError.
284 """
285 if self._state == _CANCELLED:
286 raise CancelledError
287 if self._state != _FINISHED:
288 raise InvalidStateError('Exception is not set.')
Victor Stinnere40c0782013-12-21 00:19:33 +0100289 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700290 if self._tb_logger is not None:
291 self._tb_logger.clear()
Victor Stinnere40c0782013-12-21 00:19:33 +0100292 self._tb_logger = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700293 return self._exception
294
295 def add_done_callback(self, fn):
296 """Add a callback to be run when the future becomes done.
297
298 The callback is called with a single argument - the future object. If
299 the future is already done when this is called, the callback is
300 scheduled with call_soon.
301 """
302 if self._state != _PENDING:
303 self._loop.call_soon(fn, self)
304 else:
305 self._callbacks.append(fn)
306
307 # New method not in PEP 3148.
308
309 def remove_done_callback(self, fn):
310 """Remove all instances of a callback from the "call when done" list.
311
312 Returns the number of callbacks removed.
313 """
314 filtered_callbacks = [f for f in self._callbacks if f != fn]
315 removed_count = len(self._callbacks) - len(filtered_callbacks)
316 if removed_count:
317 self._callbacks[:] = filtered_callbacks
318 return removed_count
319
320 # So-called internal methods (note: no set_running_or_notify_cancel()).
321
322 def set_result(self, result):
323 """Mark the future done and set its result.
324
325 If the future is already done when this method is called, raises
326 InvalidStateError.
327 """
328 if self._state != _PENDING:
329 raise InvalidStateError('{}: {!r}'.format(self._state, self))
330 self._result = result
331 self._state = _FINISHED
332 self._schedule_callbacks()
333
334 def set_exception(self, exception):
335 """Mark the future done and set an exception.
336
337 If the future is already done when this method is called, raises
338 InvalidStateError.
339 """
340 if self._state != _PENDING:
341 raise InvalidStateError('{}: {!r}'.format(self._state, self))
Victor Stinner95728982014-01-30 16:01:54 -0800342 if isinstance(exception, type):
343 exception = exception()
Yury Selivanov1bd03072016-03-02 11:03:28 -0500344 if type(exception) is StopIteration:
345 raise TypeError("StopIteration interacts badly with generators "
346 "and cannot be raised into a Future")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700347 self._exception = exception
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700348 self._state = _FINISHED
349 self._schedule_callbacks()
Victor Stinner71080fc2015-07-25 02:23:21 +0200350 if compat.PY34:
Victor Stinnere40c0782013-12-21 00:19:33 +0100351 self._log_traceback = True
Victor Stinner4c3c6992013-12-19 22:42:40 +0100352 else:
Victor Stinner80f53aa2014-06-27 13:52:20 +0200353 self._tb_logger = _TracebackLogger(self, exception)
Victor Stinner4c3c6992013-12-19 22:42:40 +0100354 # Arrange for the logger to be activated after all callbacks
355 # have had a chance to call result() or exception().
356 self._loop.call_soon(self._tb_logger.activate)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700357
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700358 def __iter__(self):
359 if not self.done():
360 self._blocking = True
361 yield self # This tells Task to wait for completion.
362 assert self.done(), "yield from wasn't used with future"
363 return self.result() # May raise too.
364
Victor Stinner71080fc2015-07-25 02:23:21 +0200365 if compat.PY35:
Yury Selivanov1af2bf72015-05-11 22:27:25 -0400366 __await__ = __iter__ # make compatible with 'await' expression
367
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700368
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500369def _set_result_unless_cancelled(fut, result):
370 """Helper setting the result only if the future was not cancelled."""
371 if fut.cancelled():
372 return
373 fut.set_result(result)
374
375
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700376def _set_concurrent_future_state(concurrent, source):
377 """Copy state from a future to a concurrent.futures.Future."""
378 assert source.done()
379 if source.cancelled():
380 concurrent.cancel()
381 if not concurrent.set_running_or_notify_cancel():
382 return
383 exception = source.exception()
384 if exception is not None:
385 concurrent.set_exception(exception)
386 else:
387 result = source.result()
388 concurrent.set_result(result)
389
390
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500391def _copy_future_state(source, dest):
392 """Internal helper to copy state from another Future.
393
394 The other Future may be a concurrent.futures.Future.
395 """
396 assert source.done()
397 if dest.cancelled():
398 return
399 assert not dest.done()
400 if source.cancelled():
401 dest.cancel()
402 else:
403 exception = source.exception()
404 if exception is not None:
405 dest.set_exception(exception)
406 else:
407 result = source.result()
408 dest.set_result(result)
409
410
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700411def _chain_future(source, destination):
412 """Chain two futures so that when one completes, so does the other.
413
414 The result (or exception) of source will be copied to destination.
415 If destination is cancelled, source gets cancelled too.
416 Compatible with both asyncio.Future and concurrent.futures.Future.
417 """
418 if not isinstance(source, (Future, concurrent.futures.Future)):
419 raise TypeError('A future is required for source argument')
420 if not isinstance(destination, (Future, concurrent.futures.Future)):
421 raise TypeError('A future is required for destination argument')
422 source_loop = source._loop if isinstance(source, Future) else None
423 dest_loop = destination._loop if isinstance(destination, Future) else None
424
425 def _set_state(future, other):
426 if isinstance(future, Future):
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500427 _copy_future_state(other, future)
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700428 else:
429 _set_concurrent_future_state(future, other)
430
431 def _call_check_cancel(destination):
432 if destination.cancelled():
433 if source_loop is None or source_loop is dest_loop:
434 source.cancel()
435 else:
436 source_loop.call_soon_threadsafe(source.cancel)
437
438 def _call_set_state(source):
439 if dest_loop is None or dest_loop is source_loop:
440 _set_state(destination, source)
441 else:
442 dest_loop.call_soon_threadsafe(_set_state, destination, source)
443
444 destination.add_done_callback(_call_check_cancel)
445 source.add_done_callback(_call_set_state)
446
447
448def wrap_future(future, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700449 """Wrap concurrent.futures.Future object."""
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700450 if isinstance(future, Future):
451 return future
452 assert isinstance(future, concurrent.futures.Future), \
453 'concurrent.futures.Future is expected, got {!r}'.format(future)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700454 new_future = Future(loop=loop)
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700455 _chain_future(future, new_future)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700456 return new_future