blob: 6cfded307555a3a24989129f121e59ac8e027c84 [file] [log] [blame]
Brian Quinlan81c4d362010-09-18 22:35:02 +00001# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4__author__ = 'Brian Quinlan (brian@sweetapp.com)'
5
6import collections
7import functools
8import logging
9import threading
10import time
11
12FIRST_COMPLETED = 'FIRST_COMPLETED'
13FIRST_EXCEPTION = 'FIRST_EXCEPTION'
14ALL_COMPLETED = 'ALL_COMPLETED'
Brian Quinlan3ec60182010-11-17 11:06:29 +000015_AS_COMPLETED = '_AS_COMPLETED'
Brian Quinlan81c4d362010-09-18 22:35:02 +000016
17# Possible future states (for internal use by the futures package).
18PENDING = 'PENDING'
19RUNNING = 'RUNNING'
20# The future was cancelled by the user...
21CANCELLED = 'CANCELLED'
22# ...and _Waiter.add_cancelled() was called by a worker.
23CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
24FINISHED = 'FINISHED'
25
26_FUTURE_STATES = [
27 PENDING,
28 RUNNING,
29 CANCELLED,
30 CANCELLED_AND_NOTIFIED,
31 FINISHED
32]
33
34_STATE_TO_DESCRIPTION_MAP = {
35 PENDING: "pending",
36 RUNNING: "running",
37 CANCELLED: "cancelled",
38 CANCELLED_AND_NOTIFIED: "cancelled",
39 FINISHED: "finished"
40}
41
42# Logger for internal use by the futures package.
43LOGGER = logging.getLogger("concurrent.futures")
Brian Quinlan81c4d362010-09-18 22:35:02 +000044
45class Error(Exception):
46 """Base class for all future-related exceptions."""
47 pass
48
49class CancelledError(Error):
50 """The Future was cancelled."""
51 pass
52
53class TimeoutError(Error):
54 """The operation exceeded the given deadline."""
55 pass
56
57class _Waiter(object):
58 """Provides the event that wait() and as_completed() block on."""
59 def __init__(self):
60 self.event = threading.Event()
61 self.finished_futures = []
62
63 def add_result(self, future):
64 self.finished_futures.append(future)
65
66 def add_exception(self, future):
67 self.finished_futures.append(future)
68
69 def add_cancelled(self, future):
70 self.finished_futures.append(future)
71
Brian Quinlan3ec60182010-11-17 11:06:29 +000072class _AsCompletedWaiter(_Waiter):
73 """Used by as_completed()."""
74
75 def __init__(self):
76 super(_AsCompletedWaiter, self).__init__()
77 self.lock = threading.Lock()
78
79 def add_result(self, future):
80 with self.lock:
81 super(_AsCompletedWaiter, self).add_result(future)
82 self.event.set()
83
84 def add_exception(self, future):
85 with self.lock:
86 super(_AsCompletedWaiter, self).add_exception(future)
87 self.event.set()
88
89 def add_cancelled(self, future):
90 with self.lock:
91 super(_AsCompletedWaiter, self).add_cancelled(future)
92 self.event.set()
93
Brian Quinlan81c4d362010-09-18 22:35:02 +000094class _FirstCompletedWaiter(_Waiter):
Brian Quinlan3ec60182010-11-17 11:06:29 +000095 """Used by wait(return_when=FIRST_COMPLETED)."""
Brian Quinlan81c4d362010-09-18 22:35:02 +000096
97 def add_result(self, future):
98 super().add_result(future)
99 self.event.set()
100
101 def add_exception(self, future):
102 super().add_exception(future)
103 self.event.set()
104
105 def add_cancelled(self, future):
106 super().add_cancelled(future)
107 self.event.set()
108
109class _AllCompletedWaiter(_Waiter):
110 """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""
111
112 def __init__(self, num_pending_calls, stop_on_exception):
113 self.num_pending_calls = num_pending_calls
114 self.stop_on_exception = stop_on_exception
115 super().__init__()
116
117 def _decrement_pending_calls(self):
118 self.num_pending_calls -= 1
119 if not self.num_pending_calls:
120 self.event.set()
121
122 def add_result(self, future):
123 super().add_result(future)
124 self._decrement_pending_calls()
125
126 def add_exception(self, future):
127 super().add_exception(future)
128 if self.stop_on_exception:
129 self.event.set()
130 else:
131 self._decrement_pending_calls()
132
133 def add_cancelled(self, future):
134 super().add_cancelled(future)
135 self._decrement_pending_calls()
136
137class _AcquireFutures(object):
138 """A context manager that does an ordered acquire of Future conditions."""
139
140 def __init__(self, futures):
141 self.futures = sorted(futures, key=id)
142
143 def __enter__(self):
144 for future in self.futures:
145 future._condition.acquire()
146
147 def __exit__(self, *args):
148 for future in self.futures:
149 future._condition.release()
150
151def _create_and_install_waiters(fs, return_when):
Brian Quinlan3ec60182010-11-17 11:06:29 +0000152 if return_when == _AS_COMPLETED:
153 waiter = _AsCompletedWaiter()
154 elif return_when == FIRST_COMPLETED:
Brian Quinlan81c4d362010-09-18 22:35:02 +0000155 waiter = _FirstCompletedWaiter()
156 else:
157 pending_count = sum(
158 f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)
159
160 if return_when == FIRST_EXCEPTION:
161 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True)
162 elif return_when == ALL_COMPLETED:
163 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False)
164 else:
165 raise ValueError("Invalid return condition: %r" % return_when)
166
167 for f in fs:
168 f._waiters.append(waiter)
169
170 return waiter
171
172def as_completed(fs, timeout=None):
173 """An iterator over the given futures that yields each as it completes.
174
175 Args:
176 fs: The sequence of Futures (possibly created by different Executors) to
177 iterate over.
178 timeout: The maximum number of seconds to wait. If None, then there
179 is no limit on the wait time.
180
181 Returns:
182 An iterator that yields the given Futures as they complete (finished or
183 cancelled).
184
185 Raises:
186 TimeoutError: If the entire result iterator could not be generated
187 before the given timeout.
188 """
189 if timeout is not None:
190 end_time = timeout + time.time()
191
192 with _AcquireFutures(fs):
193 finished = set(
194 f for f in fs
195 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
196 pending = set(fs) - finished
Brian Quinlan3ec60182010-11-17 11:06:29 +0000197 waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000198
199 try:
200 for future in finished:
201 yield future
202
203 while pending:
204 if timeout is None:
205 wait_timeout = None
206 else:
207 wait_timeout = end_time - time.time()
208 if wait_timeout < 0:
209 raise TimeoutError(
210 '%d (of %d) futures unfinished' % (
211 len(pending), len(fs)))
212
Brian Quinlan3ec60182010-11-17 11:06:29 +0000213 waiter.event.wait(wait_timeout)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000214
Brian Quinlan3ec60182010-11-17 11:06:29 +0000215 with waiter.lock:
216 finished = waiter.finished_futures
217 waiter.finished_futures = []
218 waiter.event.clear()
219
220 for future in finished:
Brian Quinlan81c4d362010-09-18 22:35:02 +0000221 yield future
Brian Quinlan81c4d362010-09-18 22:35:02 +0000222 pending.remove(future)
223
224 finally:
225 for f in fs:
226 f._waiters.remove(waiter)
227
228DoneAndNotDoneFutures = collections.namedtuple(
229 'DoneAndNotDoneFutures', 'done not_done')
230def wait(fs, timeout=None, return_when=ALL_COMPLETED):
231 """Wait for the futures in the given sequence to complete.
232
233 Args:
234 fs: The sequence of Futures (possibly created by different Executors) to
235 wait upon.
236 timeout: The maximum number of seconds to wait. If None, then there
237 is no limit on the wait time.
238 return_when: Indicates when this function should return. The options
239 are:
240
241 FIRST_COMPLETED - Return when any future finishes or is
242 cancelled.
243 FIRST_EXCEPTION - Return when any future finishes by raising an
244 exception. If no future raises an exception
245 then it is equivalent to ALL_COMPLETED.
246 ALL_COMPLETED - Return when all futures finish or are cancelled.
247
248 Returns:
249 A named 2-tuple of sets. The first set, named 'done', contains the
250 futures that completed (is finished or cancelled) before the wait
251 completed. The second set, named 'not_done', contains uncompleted
252 futures.
253 """
254 with _AcquireFutures(fs):
255 done = set(f for f in fs
256 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
257 not_done = set(fs) - done
258
259 if (return_when == FIRST_COMPLETED) and done:
260 return DoneAndNotDoneFutures(done, not_done)
261 elif (return_when == FIRST_EXCEPTION) and done:
262 if any(f for f in done
263 if not f.cancelled() and f.exception() is not None):
264 return DoneAndNotDoneFutures(done, not_done)
265
266 if len(done) == len(fs):
267 return DoneAndNotDoneFutures(done, not_done)
268
269 waiter = _create_and_install_waiters(fs, return_when)
270
271 waiter.event.wait(timeout)
272 for f in fs:
273 f._waiters.remove(waiter)
274
275 done.update(waiter.finished_futures)
276 return DoneAndNotDoneFutures(done, set(fs) - done)
277
278class Future(object):
279 """Represents the result of an asynchronous computation."""
280
281 def __init__(self):
282 """Initializes the future. Should not be called by clients."""
283 self._condition = threading.Condition()
284 self._state = PENDING
285 self._result = None
286 self._exception = None
287 self._waiters = []
288 self._done_callbacks = []
289
290 def _invoke_callbacks(self):
291 for callback in self._done_callbacks:
292 try:
293 callback(self)
294 except Exception:
295 LOGGER.exception('exception calling callback for %r', self)
296
297 def __repr__(self):
298 with self._condition:
299 if self._state == FINISHED:
300 if self._exception:
301 return '<Future at %s state=%s raised %s>' % (
302 hex(id(self)),
303 _STATE_TO_DESCRIPTION_MAP[self._state],
304 self._exception.__class__.__name__)
305 else:
306 return '<Future at %s state=%s returned %s>' % (
307 hex(id(self)),
308 _STATE_TO_DESCRIPTION_MAP[self._state],
309 self._result.__class__.__name__)
310 return '<Future at %s state=%s>' % (
311 hex(id(self)),
312 _STATE_TO_DESCRIPTION_MAP[self._state])
313
314 def cancel(self):
315 """Cancel the future if possible.
316
317 Returns True if the future was cancelled, False otherwise. A future
318 cannot be cancelled if it is running or has already completed.
319 """
320 with self._condition:
321 if self._state in [RUNNING, FINISHED]:
322 return False
323
324 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
325 return True
326
327 self._state = CANCELLED
328 self._condition.notify_all()
329
330 self._invoke_callbacks()
331 return True
332
333 def cancelled(self):
334 """Return True if the future has cancelled."""
335 with self._condition:
336 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
337
338 def running(self):
339 """Return True if the future is currently executing."""
340 with self._condition:
341 return self._state == RUNNING
342
343 def done(self):
344 """Return True of the future was cancelled or finished executing."""
345 with self._condition:
346 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
347
348 def __get_result(self):
349 if self._exception:
350 raise self._exception
351 else:
352 return self._result
353
354 def add_done_callback(self, fn):
355 """Attaches a callable that will be called when the future finishes.
356
357 Args:
358 fn: A callable that will be called with this future as its only
359 argument when the future completes or is cancelled. The callable
360 will always be called by a thread in the same process in which
361 it was added. If the future has already completed or been
362 cancelled then the callable will be called immediately. These
363 callables are called in the order that they were added.
364 """
365 with self._condition:
366 if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
367 self._done_callbacks.append(fn)
368 return
369 fn(self)
370
371 def result(self, timeout=None):
372 """Return the result of the call that the future represents.
373
374 Args:
375 timeout: The number of seconds to wait for the result if the future
376 isn't done. If None, then there is no limit on the wait time.
377
378 Returns:
379 The result of the call that the future represents.
380
381 Raises:
382 CancelledError: If the future was cancelled.
383 TimeoutError: If the future didn't finish executing before the given
384 timeout.
385 Exception: If the call raised then that exception will be raised.
386 """
387 with self._condition:
388 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
389 raise CancelledError()
390 elif self._state == FINISHED:
391 return self.__get_result()
392
393 self._condition.wait(timeout)
394
395 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
396 raise CancelledError()
397 elif self._state == FINISHED:
398 return self.__get_result()
399 else:
400 raise TimeoutError()
401
402 def exception(self, timeout=None):
403 """Return the exception raised by the call that the future represents.
404
405 Args:
406 timeout: The number of seconds to wait for the exception if the
407 future isn't done. If None, then there is no limit on the wait
408 time.
409
410 Returns:
411 The exception raised by the call that the future represents or None
412 if the call completed without raising.
413
414 Raises:
415 CancelledError: If the future was cancelled.
416 TimeoutError: If the future didn't finish executing before the given
417 timeout.
418 """
419
420 with self._condition:
421 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
422 raise CancelledError()
423 elif self._state == FINISHED:
424 return self._exception
425
426 self._condition.wait(timeout)
427
428 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
429 raise CancelledError()
430 elif self._state == FINISHED:
431 return self._exception
432 else:
433 raise TimeoutError()
434
435 # The following methods should only be used by Executors and in tests.
436 def set_running_or_notify_cancel(self):
437 """Mark the future as running or process any cancel notifications.
438
439 Should only be used by Executor implementations and unit tests.
440
441 If the future has been cancelled (cancel() was called and returned
442 True) then any threads waiting on the future completing (though calls
443 to as_completed() or wait()) are notified and False is returned.
444
445 If the future was not cancelled then it is put in the running state
446 (future calls to running() will return True) and True is returned.
447
448 This method should be called by Executor implementations before
449 executing the work associated with this future. If this method returns
450 False then the work should not be executed.
451
452 Returns:
453 False if the Future was cancelled, True otherwise.
454
455 Raises:
456 RuntimeError: if this method was already called or if set_result()
457 or set_exception() was called.
458 """
459 with self._condition:
460 if self._state == CANCELLED:
461 self._state = CANCELLED_AND_NOTIFIED
462 for waiter in self._waiters:
463 waiter.add_cancelled(self)
464 # self._condition.notify_all() is not necessary because
465 # self.cancel() triggers a notification.
466 return False
467 elif self._state == PENDING:
468 self._state = RUNNING
469 return True
470 else:
471 LOGGER.critical('Future %s in unexpected state: %s',
472 id(self.future),
473 self.future._state)
474 raise RuntimeError('Future in unexpected state')
475
476 def set_result(self, result):
477 """Sets the return value of work associated with the future.
478
479 Should only be used by Executor implementations and unit tests.
480 """
481 with self._condition:
482 self._result = result
483 self._state = FINISHED
484 for waiter in self._waiters:
485 waiter.add_result(self)
486 self._condition.notify_all()
487 self._invoke_callbacks()
488
489 def set_exception(self, exception):
490 """Sets the result of the future as being the given exception.
491
492 Should only be used by Executor implementations and unit tests.
493 """
494 with self._condition:
495 self._exception = exception
496 self._state = FINISHED
497 for waiter in self._waiters:
498 waiter.add_exception(self)
499 self._condition.notify_all()
500 self._invoke_callbacks()
501
502class Executor(object):
503 """This is an abstract base class for concrete asynchronous executors."""
504
505 def submit(self, fn, *args, **kwargs):
506 """Submits a callable to be executed with the given arguments.
507
508 Schedules the callable to be executed as fn(*args, **kwargs) and returns
509 a Future instance representing the execution of the callable.
510
511 Returns:
512 A Future representing the given call.
513 """
514 raise NotImplementedError()
515
516 def map(self, fn, *iterables, timeout=None):
517 """Returns a iterator equivalent to map(fn, iter).
518
519 Args:
520 fn: A callable that will take take as many arguments as there are
521 passed iterables.
522 timeout: The maximum number of seconds to wait. If None, then there
523 is no limit on the wait time.
524
525 Returns:
526 An iterator equivalent to: map(func, *iterables) but the calls may
527 be evaluated out-of-order.
528
529 Raises:
530 TimeoutError: If the entire result iterator could not be generated
531 before the given timeout.
532 Exception: If fn(*args) raises for any values.
533 """
534 if timeout is not None:
535 end_time = timeout + time.time()
536
537 fs = [self.submit(fn, *args) for args in zip(*iterables)]
538
Brian Quinlanf0078762011-04-08 08:19:33 +1000539 # Yield must be hidden in closure so that the futures are submitted
540 # before the first iterator value is required.
541 def result_iterator():
542 try:
543 for future in fs:
544 if timeout is None:
545 yield future.result()
546 else:
547 yield future.result(end_time - time.time())
548 finally:
549 for future in fs:
550 future.cancel()
551 return result_iterator()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000552
553 def shutdown(self, wait=True):
554 """Clean-up the resources associated with the Executor.
555
556 It is safe to call this method several times. Otherwise, no other
557 methods can be called after this one.
558
559 Args:
560 wait: If True then shutdown will not return until all running
561 futures have finished executing and the resources used by the
562 executor have been reclaimed.
563 """
564 pass
565
566 def __enter__(self):
567 return self
568
569 def __exit__(self, exc_type, exc_val, exc_tb):
570 self.shutdown(wait=True)
571 return False