blob: 9ca8d8458bcf6248a54b286d5fd9b7fb719cb7d9 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""A Future class similar to the one in PEP 3148."""
2
3__all__ = ['CancelledError', 'TimeoutError',
4 'InvalidStateError',
Yury Selivanov49d6b8c2016-11-07 16:00:50 -05005 'Future', 'wrap_future', 'isfuture',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07006 ]
7
8import concurrent.futures._base
9import logging
Victor Stinner313a9802014-07-29 12:58:23 +020010import reprlib
Victor Stinner4c3c6992013-12-19 22:42:40 +010011import sys
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012import traceback
13
Victor Stinner71080fc2015-07-25 02:23:21 +020014from . import compat
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016
17# States for Future.
18_PENDING = 'PENDING'
19_CANCELLED = 'CANCELLED'
20_FINISHED = 'FINISHED'
21
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022Error = concurrent.futures._base.Error
23CancelledError = concurrent.futures.CancelledError
24TimeoutError = concurrent.futures.TimeoutError
25
26STACK_DEBUG = logging.DEBUG - 1 # heavy-duty debugging
27
28
29class InvalidStateError(Error):
30 """The operation is not allowed in this state."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031
32
33class _TracebackLogger:
34 """Helper to log a traceback upon destruction if not cleared.
35
36 This solves a nasty problem with Futures and Tasks that have an
37 exception set: if nobody asks for the exception, the exception is
38 never logged. This violates the Zen of Python: 'Errors should
39 never pass silently. Unless explicitly silenced.'
40
41 However, we don't want to log the exception as soon as
42 set_exception() is called: if the calling code is written
43 properly, it will get the exception and handle it properly. But
44 we *do* want to log it if result() or exception() was never called
45 -- otherwise developers waste a lot of time wondering why their
46 buggy code fails silently.
47
48 An earlier attempt added a __del__() method to the Future class
49 itself, but this backfired because the presence of __del__()
50 prevents garbage collection from breaking cycles. A way out of
51 this catch-22 is to avoid having a __del__() method on the Future
52 class itself, but instead to have a reference to a helper object
53 with a __del__() method that logs the traceback, where we ensure
54 that the helper object doesn't participate in cycles, and only the
55 Future has a reference to it.
56
57 The helper object is added when set_exception() is called. When
58 the Future is collected, and the helper is present, the helper
59 object is also collected, and its __del__() method will log the
60 traceback. When the Future's result() or exception() method is
Serhiy Storchaka56a6d852014-12-01 18:28:43 +020061 called (and a helper object is present), it removes the helper
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070062 object, after calling its clear() method to prevent it from
63 logging.
64
65 One downside is that we do a fair amount of work to extract the
66 traceback from the exception, even when it is never logged. It
67 would seem cheaper to just store the exception object, but that
68 references the traceback, which references stack frames, which may
69 reference the Future, which references the _TracebackLogger, and
70 then the _TracebackLogger would be included in a cycle, which is
71 what we're trying to avoid! As an optimization, we don't
72 immediately format the exception; we only do the work when
73 activate() is called, which call is delayed until after all the
74 Future's callbacks have run. Since usually a Future has at least
75 one callback (typically set by 'yield from') and usually that
76 callback extracts the callback, thereby removing the need to
77 format the exception.
78
79 PS. I don't claim credit for this solution. I first heard of it
80 in a discussion about closing files when they are collected.
81 """
82
Victor Stinner80f53aa2014-06-27 13:52:20 +020083 __slots__ = ('loop', 'source_traceback', 'exc', 'tb')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070084
Victor Stinner80f53aa2014-06-27 13:52:20 +020085 def __init__(self, future, exc):
86 self.loop = future._loop
87 self.source_traceback = future._source_traceback
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070088 self.exc = exc
89 self.tb = None
90
91 def activate(self):
92 exc = self.exc
93 if exc is not None:
94 self.exc = None
95 self.tb = traceback.format_exception(exc.__class__, exc,
96 exc.__traceback__)
97
98 def clear(self):
99 self.exc = None
100 self.tb = None
101
102 def __del__(self):
103 if self.tb:
Victor Stinner662fd5f2014-11-20 14:16:31 +0100104 msg = 'Future/Task exception was never retrieved\n'
Victor Stinner80f53aa2014-06-27 13:52:20 +0200105 if self.source_traceback:
Victor Stinner662fd5f2014-11-20 14:16:31 +0100106 src = ''.join(traceback.format_list(self.source_traceback))
107 msg += 'Future/Task created at (most recent call last):\n'
108 msg += '%s\n' % src.rstrip()
Victor Stinner80f53aa2014-06-27 13:52:20 +0200109 msg += ''.join(self.tb).rstrip()
110 self.loop.call_exception_handler({'message': msg})
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700111
112
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700113def isfuture(obj):
114 """Check for a Future.
115
116 This returns True when obj is a Future instance or is advertising
117 itself as duck-type compatible by setting _asyncio_future_blocking.
118 See comment in Future for more details.
119 """
Yury Selivanov49d6b8c2016-11-07 16:00:50 -0500120 return (hasattr(obj.__class__, '_asyncio_future_blocking') and
121 obj._asyncio_future_blocking is not None)
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700122
123
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700124class Future:
125 """This class is *almost* compatible with concurrent.futures.Future.
126
127 Differences:
128
129 - result() and exception() do not take a timeout argument and
130 raise an exception when the future isn't done yet.
131
132 - Callbacks registered with add_done_callback() are always called
133 via the event loop's call_soon_threadsafe().
134
135 - This class is not compatible with the wait() and as_completed()
136 methods in the concurrent.futures package.
137
138 (In Python 3.4 or later we may be able to unify the implementations.)
139 """
140
141 # Class variables serving as defaults for instance variables.
142 _state = _PENDING
143 _result = None
144 _exception = None
145 _loop = None
Victor Stinnerfe22e092014-12-04 23:00:13 +0100146 _source_traceback = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700147
Guido van Rossum1140a032016-09-09 12:54:54 -0700148 # This field is used for a dual purpose:
149 # - Its presence is a marker to declare that a class implements
150 # the Future protocol (i.e. is intended to be duck-type compatible).
151 # The value must also be not-None, to enable a subclass to declare
152 # that it is not compatible by setting this to None.
153 # - It is set by __iter__() below so that Task._step() can tell
154 # the difference between `yield from Future()` (correct) vs.
155 # `yield Future()` (incorrect).
156 _asyncio_future_blocking = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700157
Victor Stinnere40c0782013-12-21 00:19:33 +0100158 _log_traceback = False # Used for Python 3.4 and later
159 _tb_logger = None # Used for Python 3.3 only
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700160
161 def __init__(self, *, loop=None):
162 """Initialize the future.
163
Martin Panterc04fb562016-02-10 05:44:01 +0000164 The optional event_loop argument allows explicitly setting the event
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700165 loop object used by the future. If it's not provided, the future uses
166 the default event loop.
167 """
168 if loop is None:
169 self._loop = events.get_event_loop()
170 else:
171 self._loop = loop
172 self._callbacks = []
Victor Stinner80f53aa2014-06-27 13:52:20 +0200173 if self._loop.get_debug():
174 self._source_traceback = traceback.extract_stack(sys._getframe(1))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700175
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500176 def __format_callbacks(self):
Victor Stinner975735f2014-06-25 21:41:58 +0200177 cb = self._callbacks
178 size = len(cb)
179 if not size:
180 cb = ''
181
182 def format_cb(callback):
Guido van Rossum0a9933e2015-05-02 18:38:24 -0700183 return events._format_callback_source(callback, ())
Victor Stinner975735f2014-06-25 21:41:58 +0200184
185 if size == 1:
186 cb = format_cb(cb[0])
187 elif size == 2:
188 cb = '{}, {}'.format(format_cb(cb[0]), format_cb(cb[1]))
189 elif size > 2:
190 cb = '{}, <{} more>, {}'.format(format_cb(cb[0]),
191 size-2,
192 format_cb(cb[-1]))
193 return 'cb=[%s]' % cb
194
Victor Stinner313a9802014-07-29 12:58:23 +0200195 def _repr_info(self):
Victor Stinner975735f2014-06-25 21:41:58 +0200196 info = [self._state.lower()]
197 if self._state == _FINISHED:
Victor Stinner313a9802014-07-29 12:58:23 +0200198 if self._exception is not None:
199 info.append('exception={!r}'.format(self._exception))
200 else:
201 # use reprlib to limit the length of the output, especially
202 # for very long strings
203 result = reprlib.repr(self._result)
204 info.append('result={}'.format(result))
Victor Stinner975735f2014-06-25 21:41:58 +0200205 if self._callbacks:
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500206 info.append(self.__format_callbacks())
Victor Stinner313a9802014-07-29 12:58:23 +0200207 if self._source_traceback:
208 frame = self._source_traceback[-1]
209 info.append('created at %s:%s' % (frame[0], frame[1]))
210 return info
211
212 def __repr__(self):
213 info = self._repr_info()
Victor Stinner975735f2014-06-25 21:41:58 +0200214 return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700215
Victor Stinner978a9af2015-01-29 17:50:58 +0100216 # On Python 3.3 and older, objects with a destructor part of a reference
217 # cycle are never destroyed. It's not more the case on Python 3.4 thanks
218 # to the PEP 442.
Victor Stinner71080fc2015-07-25 02:23:21 +0200219 if compat.PY34:
Victor Stinner4c3c6992013-12-19 22:42:40 +0100220 def __del__(self):
Victor Stinnere40c0782013-12-21 00:19:33 +0100221 if not self._log_traceback:
222 # set_exception() was not called, or result() or exception()
223 # has consumed the exception
224 return
225 exc = self._exception
Yury Selivanov569efa22014-02-18 18:02:19 -0500226 context = {
Victor Stinner80f53aa2014-06-27 13:52:20 +0200227 'message': ('%s exception was never retrieved'
228 % self.__class__.__name__),
Yury Selivanov569efa22014-02-18 18:02:19 -0500229 'exception': exc,
230 'future': self,
231 }
Victor Stinner80f53aa2014-06-27 13:52:20 +0200232 if self._source_traceback:
233 context['source_traceback'] = self._source_traceback
Yury Selivanov569efa22014-02-18 18:02:19 -0500234 self._loop.call_exception_handler(context)
Victor Stinner4c3c6992013-12-19 22:42:40 +0100235
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700236 def cancel(self):
237 """Cancel the future and schedule callbacks.
238
239 If the future is already done or cancelled, return False. Otherwise,
240 change the future's state to cancelled, schedule the callbacks and
241 return True.
242 """
243 if self._state != _PENDING:
244 return False
245 self._state = _CANCELLED
246 self._schedule_callbacks()
247 return True
248
249 def _schedule_callbacks(self):
250 """Internal: Ask the event loop to call all callbacks.
251
252 The callbacks are scheduled to be called as soon as possible. Also
253 clears the callback list.
254 """
255 callbacks = self._callbacks[:]
256 if not callbacks:
257 return
258
259 self._callbacks[:] = []
260 for callback in callbacks:
261 self._loop.call_soon(callback, self)
262
263 def cancelled(self):
264 """Return True if the future was cancelled."""
265 return self._state == _CANCELLED
266
267 # Don't implement running(); see http://bugs.python.org/issue18699
268
269 def done(self):
270 """Return True if the future is done.
271
272 Done means either that a result / exception are available, or that the
273 future was cancelled.
274 """
275 return self._state != _PENDING
276
277 def result(self):
278 """Return the result this future represents.
279
280 If the future has been cancelled, raises CancelledError. If the
281 future's result isn't yet available, raises InvalidStateError. If
282 the future is done and has an exception set, this exception is raised.
283 """
284 if self._state == _CANCELLED:
285 raise CancelledError
286 if self._state != _FINISHED:
287 raise InvalidStateError('Result is not ready.')
Victor Stinnere40c0782013-12-21 00:19:33 +0100288 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700289 if self._tb_logger is not None:
290 self._tb_logger.clear()
Victor Stinnere40c0782013-12-21 00:19:33 +0100291 self._tb_logger = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700292 if self._exception is not None:
293 raise self._exception
294 return self._result
295
296 def exception(self):
297 """Return the exception that was set on this future.
298
299 The exception (or None if no exception was set) is returned only if
300 the future is done. If the future has been cancelled, raises
301 CancelledError. If the future isn't done yet, raises
302 InvalidStateError.
303 """
304 if self._state == _CANCELLED:
305 raise CancelledError
306 if self._state != _FINISHED:
307 raise InvalidStateError('Exception is not set.')
Victor Stinnere40c0782013-12-21 00:19:33 +0100308 self._log_traceback = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700309 if self._tb_logger is not None:
310 self._tb_logger.clear()
Victor Stinnere40c0782013-12-21 00:19:33 +0100311 self._tb_logger = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700312 return self._exception
313
314 def add_done_callback(self, fn):
315 """Add a callback to be run when the future becomes done.
316
317 The callback is called with a single argument - the future object. If
318 the future is already done when this is called, the callback is
319 scheduled with call_soon.
320 """
321 if self._state != _PENDING:
322 self._loop.call_soon(fn, self)
323 else:
324 self._callbacks.append(fn)
325
326 # New method not in PEP 3148.
327
328 def remove_done_callback(self, fn):
329 """Remove all instances of a callback from the "call when done" list.
330
331 Returns the number of callbacks removed.
332 """
333 filtered_callbacks = [f for f in self._callbacks if f != fn]
334 removed_count = len(self._callbacks) - len(filtered_callbacks)
335 if removed_count:
336 self._callbacks[:] = filtered_callbacks
337 return removed_count
338
339 # So-called internal methods (note: no set_running_or_notify_cancel()).
340
341 def set_result(self, result):
342 """Mark the future done and set its result.
343
344 If the future is already done when this method is called, raises
345 InvalidStateError.
346 """
347 if self._state != _PENDING:
348 raise InvalidStateError('{}: {!r}'.format(self._state, self))
349 self._result = result
350 self._state = _FINISHED
351 self._schedule_callbacks()
352
353 def set_exception(self, exception):
354 """Mark the future done and set an exception.
355
356 If the future is already done when this method is called, raises
357 InvalidStateError.
358 """
359 if self._state != _PENDING:
360 raise InvalidStateError('{}: {!r}'.format(self._state, self))
Victor Stinner95728982014-01-30 16:01:54 -0800361 if isinstance(exception, type):
362 exception = exception()
Yury Selivanov1bd03072016-03-02 11:03:28 -0500363 if type(exception) is StopIteration:
364 raise TypeError("StopIteration interacts badly with generators "
365 "and cannot be raised into a Future")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700366 self._exception = exception
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700367 self._state = _FINISHED
368 self._schedule_callbacks()
Victor Stinner71080fc2015-07-25 02:23:21 +0200369 if compat.PY34:
Victor Stinnere40c0782013-12-21 00:19:33 +0100370 self._log_traceback = True
Victor Stinner4c3c6992013-12-19 22:42:40 +0100371 else:
Victor Stinner80f53aa2014-06-27 13:52:20 +0200372 self._tb_logger = _TracebackLogger(self, exception)
Victor Stinner4c3c6992013-12-19 22:42:40 +0100373 # Arrange for the logger to be activated after all callbacks
374 # have had a chance to call result() or exception().
375 self._loop.call_soon(self._tb_logger.activate)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700376
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700377 def __iter__(self):
378 if not self.done():
Guido van Rossum1140a032016-09-09 12:54:54 -0700379 self._asyncio_future_blocking = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700380 yield self # This tells Task to wait for completion.
381 assert self.done(), "yield from wasn't used with future"
382 return self.result() # May raise too.
383
Victor Stinner71080fc2015-07-25 02:23:21 +0200384 if compat.PY35:
Yury Selivanov1af2bf72015-05-11 22:27:25 -0400385 __await__ = __iter__ # make compatible with 'await' expression
386
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700387
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500388def _set_result_unless_cancelled(fut, result):
389 """Helper setting the result only if the future was not cancelled."""
390 if fut.cancelled():
391 return
392 fut.set_result(result)
393
394
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700395def _set_concurrent_future_state(concurrent, source):
396 """Copy state from a future to a concurrent.futures.Future."""
397 assert source.done()
398 if source.cancelled():
399 concurrent.cancel()
400 if not concurrent.set_running_or_notify_cancel():
401 return
402 exception = source.exception()
403 if exception is not None:
404 concurrent.set_exception(exception)
405 else:
406 result = source.result()
407 concurrent.set_result(result)
408
409
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500410def _copy_future_state(source, dest):
411 """Internal helper to copy state from another Future.
412
413 The other Future may be a concurrent.futures.Future.
414 """
415 assert source.done()
416 if dest.cancelled():
417 return
418 assert not dest.done()
419 if source.cancelled():
420 dest.cancel()
421 else:
422 exception = source.exception()
423 if exception is not None:
424 dest.set_exception(exception)
425 else:
426 result = source.result()
427 dest.set_result(result)
428
429
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700430def _chain_future(source, destination):
431 """Chain two futures so that when one completes, so does the other.
432
433 The result (or exception) of source will be copied to destination.
434 If destination is cancelled, source gets cancelled too.
435 Compatible with both asyncio.Future and concurrent.futures.Future.
436 """
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700437 if not isfuture(source) and not isinstance(source,
438 concurrent.futures.Future):
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700439 raise TypeError('A future is required for source argument')
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700440 if not isfuture(destination) and not isinstance(destination,
441 concurrent.futures.Future):
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700442 raise TypeError('A future is required for destination argument')
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700443 source_loop = source._loop if isfuture(source) else None
444 dest_loop = destination._loop if isfuture(destination) else None
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700445
446 def _set_state(future, other):
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700447 if isfuture(future):
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500448 _copy_future_state(other, future)
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700449 else:
450 _set_concurrent_future_state(future, other)
451
452 def _call_check_cancel(destination):
453 if destination.cancelled():
454 if source_loop is None or source_loop is dest_loop:
455 source.cancel()
456 else:
457 source_loop.call_soon_threadsafe(source.cancel)
458
459 def _call_set_state(source):
460 if dest_loop is None or dest_loop is source_loop:
461 _set_state(destination, source)
462 else:
463 dest_loop.call_soon_threadsafe(_set_state, destination, source)
464
465 destination.add_done_callback(_call_check_cancel)
466 source.add_done_callback(_call_set_state)
467
468
469def wrap_future(future, *, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700470 """Wrap concurrent.futures.Future object."""
Guido van Rossum7b3b3dc2016-09-09 14:26:31 -0700471 if isfuture(future):
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700472 return future
473 assert isinstance(future, concurrent.futures.Future), \
474 'concurrent.futures.Future is expected, got {!r}'.format(future)
Yury Selivanov7661db62016-05-16 15:38:39 -0400475 if loop is None:
476 loop = events.get_event_loop()
477 new_future = loop.create_future()
Guido van Rossum841d9ee2015-10-03 08:31:42 -0700478 _chain_future(future, new_future)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700479 return new_future