blob: edc13dcc474fb86f39e2abfceded678fdbdd6a21 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""A Future class similar to the one in PEP 3148."""
2
3__all__ = ['CancelledError', 'TimeoutError',
4 'InvalidStateError',
5 'Future', 'wrap_future',
6 ]
7
8import concurrent.futures._base
9import logging
Victor Stinner313a9802014-07-29 12:58:23 +020010import reprlib
Victor Stinner4c3c6992013-12-19 22:42:40 +010011import sys
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012import traceback
13
Victor Stinner71080fc2015-07-25 02:23:21 +020014from . import compat
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016
17# States for Future.
18_PENDING = 'PENDING'
19_CANCELLED = 'CANCELLED'
20_FINISHED = 'FINISHED'
21
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022Error = concurrent.futures._base.Error
23CancelledError = concurrent.futures.CancelledError
24TimeoutError = concurrent.futures.TimeoutError
25
26STACK_DEBUG = logging.DEBUG - 1 # heavy-duty debugging
27
28
29class InvalidStateError(Error):
30 """The operation is not allowed in this state."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031
32
33class _TracebackLogger:
34 """Helper to log a traceback upon destruction if not cleared.
35
36 This solves a nasty problem with Futures and Tasks that have an
37 exception set: if nobody asks for the exception, the exception is
38 never logged. This violates the Zen of Python: 'Errors should
39 never pass silently. Unless explicitly silenced.'
40
41 However, we don't want to log the exception as soon as
42 set_exception() is called: if the calling code is written
43 properly, it will get the exception and handle it properly. But
44 we *do* want to log it if result() or exception() was never called
45 -- otherwise developers waste a lot of time wondering why their
46 buggy code fails silently.
47
48 An earlier attempt added a __del__() method to the Future class
49 itself, but this backfired because the presence of __del__()
50 prevents garbage collection from breaking cycles. A way out of
51 this catch-22 is to avoid having a __del__() method on the Future
52 class itself, but instead to have a reference to a helper object
53 with a __del__() method that logs the traceback, where we ensure
54 that the helper object doesn't participate in cycles, and only the
55 Future has a reference to it.
56
57 The helper object is added when set_exception() is called. When
58 the Future is collected, and the helper is present, the helper
59 object is also collected, and its __del__() method will log the
60 traceback. When the Future's result() or exception() method is
Serhiy Storchaka56a6d852014-12-01 18:28:43 +020061 called (and a helper object is present), it removes the helper
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070062 object, after calling its clear() method to prevent it from
63 logging.
64
65 One downside is that we do a fair amount of work to extract the
66 traceback from the exception, even when it is never logged. It
67 would seem cheaper to just store the exception object, but that
68 references the traceback, which references stack frames, which may
69 reference the Future, which references the _TracebackLogger, and
70 then the _TracebackLogger would be included in a cycle, which is
71 what we're trying to avoid! As an optimization, we don't
72 immediately format the exception; we only do the work when
73 activate() is called, which call is delayed until after all the
74 Future's callbacks have run. Since usually a Future has at least
75 one callback (typically set by 'yield from') and usually that
76 callback extracts the callback, thereby removing the need to
77 format the exception.
78
79 PS. I don't claim credit for this solution. I first heard of it
80 in a discussion about closing files when they are collected.
81 """
82
Victor Stinner80f53aa2014-06-27 13:52:20 +020083 __slots__ = ('loop', 'source_traceback', 'exc', 'tb')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070084
Victor Stinner80f53aa2014-06-27 13:52:20 +020085 def __init__(self, future, exc):
86 self.loop = future._loop
87 self.source_traceback = future._source_traceback
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070088 self.exc = exc
89 self.tb = None
90
91 def activate(self):
92 exc = self.exc
93 if exc is not None:
94 self.exc = None
95 self.tb = traceback.format_exception(exc.__class__, exc,
96 exc.__traceback__)
97
98 def clear(self):
99 self.exc = None
100 self.tb = None
101
102 def __del__(self):
103 if self.tb:
Victor Stinner662fd5f2014-11-20 14:16:31 +0100104 msg = 'Future/Task exception was never retrieved\n'
Victor Stinner80f53aa2014-06-27 13:52:20 +0200105 if self.source_traceback:
Victor Stinner662fd5f2014-11-20 14:16:31 +0100106 src = ''.join(traceback.format_list(self.source_traceback))
107 msg += 'Future/Task created at (most recent call last):\n'
108 msg += '%s\n' % src.rstrip()
Victor Stinner80f53aa2014-06-27 13:52:20 +0200109 msg += ''.join(self.tb).rstrip()
110 self.loop.call_exception_handler({'message': msg})
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700111
112
113class Future:
114 """This class is *almost* compatible with concurrent.futures.Future.
115
116 Differences:
117
118 - result() and exception() do not take a timeout argument and
119 raise an exception when the future isn't done yet.
120
121 - Callbacks registered with add_done_callback() are always called
122 via the event loop's call_soon_threadsafe().
123
124 - This class is not compatible with the wait() and as_completed()
125 methods in the concurrent.futures package.
126
127 (In Python 3.4 or later we may be able to unify the implementations.)
128 """
129
130 # Class variables serving as defaults for instance variables.
131 _state = _PENDING
132 _result = None
133 _exception = None
134 _loop = None
Victor Stinnerfe22e092014-12-04 23:00:13 +0100135 _source_traceback = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700136
Guido van Rossum1140a032016-09-09 12:54:54 -0700137 # This field is used for a dual purpose:
138 # - Its presence is a marker to declare that a class implements
139 # the Future protocol (i.e. is intended to be duck-type compatible).
140 # The value must also be not-None, to enable a subclass to declare
141 # that it is not compatible by setting this to None.
142 # - It is set by __iter__() below so that Task._step() can tell
143 # the difference between `yield from Future()` (correct) vs.
144 # `yield Future()` (incorrect).
145 _asyncio_future_blocking = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700146
Victor Stinnere40c0782013-12-21 00:19:33 +0100147 _log_traceback = False # Used for Python 3.4 and later
148 _tb_logger = None # Used for Python 3.3 only
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700149
150 def __init__(self, *, loop=None):
151 """Initialize the future.
152
Martin Panterc04fb562016-02-10 05:44:01 +0000153 The optional event_loop argument allows explicitly setting the event
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700154 loop object used by the future. If it's not provided, the future uses
155 the default event loop.
156 """
157 if loop is None:
158 self._loop = events.get_event_loop()
159 else:
160 self._loop = loop
161 self._callbacks = []
Victor Stinner80f53aa2014-06-27 13:52:20 +0200162 if self._loop.get_debug():
163 self._source_traceback = traceback.extract_stack(sys._getframe(1))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700164
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500165 def __format_callbacks(self):
Victor Stinner975735f2014-06-25 21:41:58 +0200166 cb = self._callbacks
167 size = len(cb)
168 if not size:
169 cb = ''
170
171 def format_cb(callback):
Guido van Rossum0a9933e2015-05-02 18:38:24 -0700172 return events._format_callback_source(callback, ())
Victor Stinner975735f2014-06-25 21:41:58 +0200173
174 if size == 1:
175 cb = format_cb(cb[0])
176 elif size == 2:
177 cb = '{}, {}'.format(format_cb(cb[0]), format_cb(cb[1]))
178 elif size > 2:
179 cb = '{}, <{} more>, {}'.format(format_cb(cb[0]),
180 size-2,
181 format_cb(cb[-1]))
182 return 'cb=[%s]' % cb
183
Victor Stinner313a9802014-07-29 12:58:23 +0200184 def _repr_info(self):
Victor Stinner975735f2014-06-25 21:41:58 +0200185 info = [self._state.lower()]
186 if self._state == _FINISHED:
Victor Stinner313a9802014-07-29 12:58:23 +0200187 if self._exception is not None:
188 info.append('exception={!r}'.format(self._exception))
189 else:
190 # use reprlib to limit the length of the output, especially
191 # for very long strings
192 result = reprlib.repr(self._result)
193 info.append('result={}'.format(result))
Victor Stinner975735f2014-06-25 21:41:58 +0200194 if self._callbacks:
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500195 info.append(self.__format_callbacks())
Victor Stinner313a9802014-07-29 12:58:23 +0200196 if self._source_traceback:
197 frame = self._source_traceback[-1]
198 info.append('created at %s:%s' % (frame[0], frame[1]))
199 return info
200
201 def __repr__(self):
202 info = self._repr_info()
Victor Stinner975735f2014-06-25 21:41:58 +0200203 return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700204
Victor Stinner978a9af2015-01-29 17:50:58 +0100205 # On Python 3.3 and older, objects with a destructor part of a reference
206 # cycle are never destroyed. It's not more the case on Python 3.4 thanks
207 # to the PEP 442.
Victor Stinner71080fc2015-07-25 02:23:21 +0200208 if compat.PY34:
Victor Stinner4c3c6992013-12-19 22:42:40 +0100209 def __del__(self):
Victor Stinnere40c0782013-12-21 00:19:33 +0100210 if not self._log_traceback:
211 # set_exception() was not called, or result() or exception()
212 # has consumed the exception
213 return
214 exc = self._exception
Yury Selivanov569efa22014-02-18 18:02:19 -0500215 context = {
Victor Stinner80f53aa2014-06-27 13:52:20 +0200216 'message': ('%s exception was never retrieved'
217 % self.__class__.__name__),
Yury Selivanov569efa22014-02-18 18:02:19 -0500218 'exception': exc,
219 'future': self,
220 }
Victor Stinner80f53aa2014-06-27 13:52:20 +0200221 if self._source_traceback:
222 context['source_traceback'] = self._source_traceback
Yury Selivanov569efa22014-02-18 18:02:19 -0500223 self._loop.call_exception_handler(context)
Victor Stinner4c3c6992013-12-19 22:42:40 +0100224
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700225 def cancel(self):
226 """Cancel the future and schedule callbacks.
227
228 If the future is already done or cancelled, return False. Otherwise,
229 change the future's state to cancelled, schedule the callbacks and
230 return True.
231 """
232 if self._state != _PENDING:
233 return False
234 self._state = _CANCELLED
235 self._schedule_callbacks()
236 return True
237
238 def _schedule_callbacks(self):
239 """Internal: Ask the event loop to call all callbacks.
240
241 The callbacks are scheduled to be called as soon as possible. Also
242 clears the callback list.
243 """
244 callbacks = self._callbacks[:]
245 if not callbacks:
246 return
247
248 self._callbacks[:] = []
249 for callback in callbacks:
250 self._loop.call_soon(callback, self)
251
252 def cancelled(self):
253 """Return True if the future was cancelled."""
254 return self._state == _CANCELLED
255
256 # Don't implement running(); see http://bugs.python.org/issue18699
257
258 def done(self):
259 """Return True if the future is done.
260
261 Done means either that a result / exception are available, or that the
262 future was cancelled.
263 """
264 return self._state != _PENDING
265
266 def result(self):
267 """Return the result this future represents.
268
269 If the future has been cancelled, raises CancelledError. If the
270 future's result isn't yet available, raises InvalidStateError. If
271 the future is done and has an exception set, this exception is raised.
272 """
273 if self._state == _CANCELLED:
274 raise CancelledError
275 if self._state != _FINISHED:
276 raise InvalidStateError('Result is not ready.')
Victor Stinnere40c0782013-12-21 00:19:33 +0100277 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700278 if self._tb_logger is not None:
279 self._tb_logger.clear()
Victor Stinnere40c0782013-12-21 00:19:33 +0100280 self._tb_logger = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700281 if self._exception is not None:
282 raise self._exception
283 return self._result
284
285 def exception(self):
286 """Return the exception that was set on this future.
287
288 The exception (or None if no exception was set) is returned only if
289 the future is done. If the future has been cancelled, raises
290 CancelledError. If the future isn't done yet, raises
291 InvalidStateError.
292 """
293 if self._state == _CANCELLED:
294 raise CancelledError
295 if self._state != _FINISHED:
296 raise InvalidStateError('Exception is not set.')
Victor Stinnere40c0782013-12-21 00:19:33 +0100297 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700298 if self._tb_logger is not None:
299 self._tb_logger.clear()
Victor Stinnere40c0782013-12-21 00:19:33 +0100300 self._tb_logger = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700301 return self._exception
302
303 def add_done_callback(self, fn):
304 """Add a callback to be run when the future becomes done.
305
306 The callback is called with a single argument - the future object. If
307 the future is already done when this is called, the callback is
308 scheduled with call_soon.
309 """
310 if self._state != _PENDING:
311 self._loop.call_soon(fn, self)
312 else:
313 self._callbacks.append(fn)
314
315 # New method not in PEP 3148.
316
317 def remove_done_callback(self, fn):
318 """Remove all instances of a callback from the "call when done" list.
319
320 Returns the number of callbacks removed.
321 """
322 filtered_callbacks = [f for f in self._callbacks if f != fn]
323 removed_count = len(self._callbacks) - len(filtered_callbacks)
324 if removed_count:
325 self._callbacks[:] = filtered_callbacks
326 return removed_count
327
328 # So-called internal methods (note: no set_running_or_notify_cancel()).
329
330 def set_result(self, result):
331 """Mark the future done and set its result.
332
333 If the future is already done when this method is called, raises
334 InvalidStateError.
335 """
336 if self._state != _PENDING:
337 raise InvalidStateError('{}: {!r}'.format(self._state, self))
338 self._result = result
339 self._state = _FINISHED
340 self._schedule_callbacks()
341
342 def set_exception(self, exception):
343 """Mark the future done and set an exception.
344
345 If the future is already done when this method is called, raises
346 InvalidStateError.
347 """
348 if self._state != _PENDING:
349 raise InvalidStateError('{}: {!r}'.format(self._state, self))
Victor Stinner95728982014-01-30 16:01:54 -0800350 if isinstance(exception, type):
351 exception = exception()
Yury Selivanov1bd03072016-03-02 11:03:28 -0500352 if type(exception) is StopIteration:
353 raise TypeError("StopIteration interacts badly with generators "
354 "and cannot be raised into a Future")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700355 self._exception = exception
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700356 self._state = _FINISHED
357 self._schedule_callbacks()
Victor Stinner71080fc2015-07-25 02:23:21 +0200358 if compat.PY34:
Victor Stinnere40c0782013-12-21 00:19:33 +0100359 self._log_traceback = True
Victor Stinner4c3c6992013-12-19 22:42:40 +0100360 else:
Victor Stinner80f53aa2014-06-27 13:52:20 +0200361 self._tb_logger = _TracebackLogger(self, exception)
Victor Stinner4c3c6992013-12-19 22:42:40 +0100362 # Arrange for the logger to be activated after all callbacks
363 # have had a chance to call result() or exception().
364 self._loop.call_soon(self._tb_logger.activate)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700365
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700366 def __iter__(self):
367 if not self.done():
Guido van Rossum1140a032016-09-09 12:54:54 -0700368 self._asyncio_future_blocking = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700369 yield self # This tells Task to wait for completion.
370 assert self.done(), "yield from wasn't used with future"
371 return self.result() # May raise too.
372
Victor Stinner71080fc2015-07-25 02:23:21 +0200373 if compat.PY35:
Yury Selivanov1af2bf72015-05-11 22:27:25 -0400374 __await__ = __iter__ # make compatible with 'await' expression
375
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700376
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500377def _set_result_unless_cancelled(fut, result):
378 """Helper setting the result only if the future was not cancelled."""
379 if fut.cancelled():
380 return
381 fut.set_result(result)
382
383
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700384def _set_concurrent_future_state(concurrent, source):
385 """Copy state from a future to a concurrent.futures.Future."""
386 assert source.done()
387 if source.cancelled():
388 concurrent.cancel()
389 if not concurrent.set_running_or_notify_cancel():
390 return
391 exception = source.exception()
392 if exception is not None:
393 concurrent.set_exception(exception)
394 else:
395 result = source.result()
396 concurrent.set_result(result)
397
398
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500399def _copy_future_state(source, dest):
400 """Internal helper to copy state from another Future.
401
402 The other Future may be a concurrent.futures.Future.
403 """
404 assert source.done()
405 if dest.cancelled():
406 return
407 assert not dest.done()
408 if source.cancelled():
409 dest.cancel()
410 else:
411 exception = source.exception()
412 if exception is not None:
413 dest.set_exception(exception)
414 else:
415 result = source.result()
416 dest.set_result(result)
417
418
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700419def _chain_future(source, destination):
420 """Chain two futures so that when one completes, so does the other.
421
422 The result (or exception) of source will be copied to destination.
423 If destination is cancelled, source gets cancelled too.
424 Compatible with both asyncio.Future and concurrent.futures.Future.
425 """
426 if not isinstance(source, (Future, concurrent.futures.Future)):
427 raise TypeError('A future is required for source argument')
428 if not isinstance(destination, (Future, concurrent.futures.Future)):
429 raise TypeError('A future is required for destination argument')
430 source_loop = source._loop if isinstance(source, Future) else None
431 dest_loop = destination._loop if isinstance(destination, Future) else None
432
433 def _set_state(future, other):
434 if isinstance(future, Future):
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500435 _copy_future_state(other, future)
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700436 else:
437 _set_concurrent_future_state(future, other)
438
439 def _call_check_cancel(destination):
440 if destination.cancelled():
441 if source_loop is None or source_loop is dest_loop:
442 source.cancel()
443 else:
444 source_loop.call_soon_threadsafe(source.cancel)
445
446 def _call_set_state(source):
447 if dest_loop is None or dest_loop is source_loop:
448 _set_state(destination, source)
449 else:
450 dest_loop.call_soon_threadsafe(_set_state, destination, source)
451
452 destination.add_done_callback(_call_check_cancel)
453 source.add_done_callback(_call_set_state)
454
455
456def wrap_future(future, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700457 """Wrap concurrent.futures.Future object."""
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700458 if isinstance(future, Future):
459 return future
460 assert isinstance(future, concurrent.futures.Future), \
461 'concurrent.futures.Future is expected, got {!r}'.format(future)
Yury Selivanov7661db62016-05-16 15:38:39 -0400462 if loop is None:
463 loop = events.get_event_loop()
464 new_future = loop.create_future()
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700465 _chain_future(future, new_future)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700466 return new_future