blob: ca3aebd9c768187f734b9a34f9b9aef58537bebc [file] [log] [blame]
Brian Quinlan81c4d362010-09-18 22:35:02 +00001# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4__author__ = 'Brian Quinlan (brian@sweetapp.com)'
5
6import collections
Brian Quinlan81c4d362010-09-18 22:35:02 +00007import logging
8import threading
9import time
10
11FIRST_COMPLETED = 'FIRST_COMPLETED'
12FIRST_EXCEPTION = 'FIRST_EXCEPTION'
13ALL_COMPLETED = 'ALL_COMPLETED'
Brian Quinlan3ec60182010-11-17 11:06:29 +000014_AS_COMPLETED = '_AS_COMPLETED'
Brian Quinlan81c4d362010-09-18 22:35:02 +000015
16# Possible future states (for internal use by the futures package).
17PENDING = 'PENDING'
18RUNNING = 'RUNNING'
19# The future was cancelled by the user...
20CANCELLED = 'CANCELLED'
21# ...and _Waiter.add_cancelled() was called by a worker.
22CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
23FINISHED = 'FINISHED'
24
25_FUTURE_STATES = [
26 PENDING,
27 RUNNING,
28 CANCELLED,
29 CANCELLED_AND_NOTIFIED,
30 FINISHED
31]
32
33_STATE_TO_DESCRIPTION_MAP = {
34 PENDING: "pending",
35 RUNNING: "running",
36 CANCELLED: "cancelled",
37 CANCELLED_AND_NOTIFIED: "cancelled",
38 FINISHED: "finished"
39}
40
41# Logger for internal use by the futures package.
42LOGGER = logging.getLogger("concurrent.futures")
Brian Quinlan81c4d362010-09-18 22:35:02 +000043
44class Error(Exception):
45 """Base class for all future-related exceptions."""
46 pass
47
48class CancelledError(Error):
49 """The Future was cancelled."""
50 pass
51
52class TimeoutError(Error):
53 """The operation exceeded the given deadline."""
54 pass
55
56class _Waiter(object):
57 """Provides the event that wait() and as_completed() block on."""
58 def __init__(self):
59 self.event = threading.Event()
60 self.finished_futures = []
61
62 def add_result(self, future):
63 self.finished_futures.append(future)
64
65 def add_exception(self, future):
66 self.finished_futures.append(future)
67
68 def add_cancelled(self, future):
69 self.finished_futures.append(future)
70
Brian Quinlan3ec60182010-11-17 11:06:29 +000071class _AsCompletedWaiter(_Waiter):
72 """Used by as_completed()."""
73
74 def __init__(self):
75 super(_AsCompletedWaiter, self).__init__()
76 self.lock = threading.Lock()
77
78 def add_result(self, future):
79 with self.lock:
80 super(_AsCompletedWaiter, self).add_result(future)
81 self.event.set()
82
83 def add_exception(self, future):
84 with self.lock:
85 super(_AsCompletedWaiter, self).add_exception(future)
86 self.event.set()
87
88 def add_cancelled(self, future):
89 with self.lock:
90 super(_AsCompletedWaiter, self).add_cancelled(future)
91 self.event.set()
92
Brian Quinlan81c4d362010-09-18 22:35:02 +000093class _FirstCompletedWaiter(_Waiter):
Brian Quinlan3ec60182010-11-17 11:06:29 +000094 """Used by wait(return_when=FIRST_COMPLETED)."""
Brian Quinlan81c4d362010-09-18 22:35:02 +000095
96 def add_result(self, future):
97 super().add_result(future)
98 self.event.set()
99
100 def add_exception(self, future):
101 super().add_exception(future)
102 self.event.set()
103
104 def add_cancelled(self, future):
105 super().add_cancelled(future)
106 self.event.set()
107
108class _AllCompletedWaiter(_Waiter):
109 """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""
110
111 def __init__(self, num_pending_calls, stop_on_exception):
112 self.num_pending_calls = num_pending_calls
113 self.stop_on_exception = stop_on_exception
Antoine Pitrouf70401e2012-03-31 20:23:30 +0200114 self.lock = threading.Lock()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000115 super().__init__()
116
117 def _decrement_pending_calls(self):
Antoine Pitrouf70401e2012-03-31 20:23:30 +0200118 with self.lock:
119 self.num_pending_calls -= 1
120 if not self.num_pending_calls:
121 self.event.set()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000122
123 def add_result(self, future):
124 super().add_result(future)
125 self._decrement_pending_calls()
126
127 def add_exception(self, future):
128 super().add_exception(future)
129 if self.stop_on_exception:
130 self.event.set()
131 else:
132 self._decrement_pending_calls()
133
134 def add_cancelled(self, future):
135 super().add_cancelled(future)
136 self._decrement_pending_calls()
137
138class _AcquireFutures(object):
139 """A context manager that does an ordered acquire of Future conditions."""
140
141 def __init__(self, futures):
142 self.futures = sorted(futures, key=id)
143
144 def __enter__(self):
145 for future in self.futures:
146 future._condition.acquire()
147
148 def __exit__(self, *args):
149 for future in self.futures:
150 future._condition.release()
151
152def _create_and_install_waiters(fs, return_when):
Brian Quinlan3ec60182010-11-17 11:06:29 +0000153 if return_when == _AS_COMPLETED:
154 waiter = _AsCompletedWaiter()
155 elif return_when == FIRST_COMPLETED:
Brian Quinlan81c4d362010-09-18 22:35:02 +0000156 waiter = _FirstCompletedWaiter()
157 else:
158 pending_count = sum(
159 f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)
160
161 if return_when == FIRST_EXCEPTION:
162 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True)
163 elif return_when == ALL_COMPLETED:
164 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False)
165 else:
166 raise ValueError("Invalid return condition: %r" % return_when)
167
168 for f in fs:
169 f._waiters.append(waiter)
170
171 return waiter
172
173def as_completed(fs, timeout=None):
174 """An iterator over the given futures that yields each as it completes.
175
176 Args:
177 fs: The sequence of Futures (possibly created by different Executors) to
178 iterate over.
179 timeout: The maximum number of seconds to wait. If None, then there
180 is no limit on the wait time.
181
182 Returns:
183 An iterator that yields the given Futures as they complete (finished or
184 cancelled).
185
186 Raises:
187 TimeoutError: If the entire result iterator could not be generated
188 before the given timeout.
189 """
190 if timeout is not None:
191 end_time = timeout + time.time()
192
193 with _AcquireFutures(fs):
194 finished = set(
195 f for f in fs
196 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
197 pending = set(fs) - finished
Brian Quinlan3ec60182010-11-17 11:06:29 +0000198 waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000199
200 try:
201 for future in finished:
202 yield future
203
204 while pending:
205 if timeout is None:
206 wait_timeout = None
207 else:
208 wait_timeout = end_time - time.time()
209 if wait_timeout < 0:
210 raise TimeoutError(
211 '%d (of %d) futures unfinished' % (
212 len(pending), len(fs)))
213
Brian Quinlan3ec60182010-11-17 11:06:29 +0000214 waiter.event.wait(wait_timeout)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000215
Brian Quinlan3ec60182010-11-17 11:06:29 +0000216 with waiter.lock:
217 finished = waiter.finished_futures
218 waiter.finished_futures = []
219 waiter.event.clear()
220
221 for future in finished:
Brian Quinlan81c4d362010-09-18 22:35:02 +0000222 yield future
Brian Quinlan81c4d362010-09-18 22:35:02 +0000223 pending.remove(future)
224
225 finally:
226 for f in fs:
227 f._waiters.remove(waiter)
228
229DoneAndNotDoneFutures = collections.namedtuple(
230 'DoneAndNotDoneFutures', 'done not_done')
231def wait(fs, timeout=None, return_when=ALL_COMPLETED):
232 """Wait for the futures in the given sequence to complete.
233
234 Args:
235 fs: The sequence of Futures (possibly created by different Executors) to
236 wait upon.
237 timeout: The maximum number of seconds to wait. If None, then there
238 is no limit on the wait time.
239 return_when: Indicates when this function should return. The options
240 are:
241
242 FIRST_COMPLETED - Return when any future finishes or is
243 cancelled.
244 FIRST_EXCEPTION - Return when any future finishes by raising an
245 exception. If no future raises an exception
246 then it is equivalent to ALL_COMPLETED.
247 ALL_COMPLETED - Return when all futures finish or are cancelled.
248
249 Returns:
250 A named 2-tuple of sets. The first set, named 'done', contains the
251 futures that completed (is finished or cancelled) before the wait
252 completed. The second set, named 'not_done', contains uncompleted
253 futures.
254 """
255 with _AcquireFutures(fs):
256 done = set(f for f in fs
257 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
258 not_done = set(fs) - done
259
260 if (return_when == FIRST_COMPLETED) and done:
261 return DoneAndNotDoneFutures(done, not_done)
262 elif (return_when == FIRST_EXCEPTION) and done:
263 if any(f for f in done
264 if not f.cancelled() and f.exception() is not None):
265 return DoneAndNotDoneFutures(done, not_done)
266
267 if len(done) == len(fs):
268 return DoneAndNotDoneFutures(done, not_done)
269
270 waiter = _create_and_install_waiters(fs, return_when)
271
272 waiter.event.wait(timeout)
273 for f in fs:
274 f._waiters.remove(waiter)
275
276 done.update(waiter.finished_futures)
277 return DoneAndNotDoneFutures(done, set(fs) - done)
278
279class Future(object):
280 """Represents the result of an asynchronous computation."""
281
282 def __init__(self):
283 """Initializes the future. Should not be called by clients."""
284 self._condition = threading.Condition()
285 self._state = PENDING
286 self._result = None
287 self._exception = None
288 self._waiters = []
289 self._done_callbacks = []
290
291 def _invoke_callbacks(self):
292 for callback in self._done_callbacks:
293 try:
294 callback(self)
295 except Exception:
296 LOGGER.exception('exception calling callback for %r', self)
297
298 def __repr__(self):
299 with self._condition:
300 if self._state == FINISHED:
301 if self._exception:
302 return '<Future at %s state=%s raised %s>' % (
303 hex(id(self)),
304 _STATE_TO_DESCRIPTION_MAP[self._state],
305 self._exception.__class__.__name__)
306 else:
307 return '<Future at %s state=%s returned %s>' % (
308 hex(id(self)),
309 _STATE_TO_DESCRIPTION_MAP[self._state],
310 self._result.__class__.__name__)
311 return '<Future at %s state=%s>' % (
312 hex(id(self)),
313 _STATE_TO_DESCRIPTION_MAP[self._state])
314
315 def cancel(self):
316 """Cancel the future if possible.
317
318 Returns True if the future was cancelled, False otherwise. A future
319 cannot be cancelled if it is running or has already completed.
320 """
321 with self._condition:
322 if self._state in [RUNNING, FINISHED]:
323 return False
324
325 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
326 return True
327
328 self._state = CANCELLED
329 self._condition.notify_all()
330
331 self._invoke_callbacks()
332 return True
333
334 def cancelled(self):
Eli Benderskyc9d504f2013-01-17 06:36:30 -0800335 """Return True if the future was cancelled."""
Brian Quinlan81c4d362010-09-18 22:35:02 +0000336 with self._condition:
337 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
338
339 def running(self):
340 """Return True if the future is currently executing."""
341 with self._condition:
342 return self._state == RUNNING
343
344 def done(self):
345 """Return True of the future was cancelled or finished executing."""
346 with self._condition:
347 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
348
349 def __get_result(self):
350 if self._exception:
351 raise self._exception
352 else:
353 return self._result
354
355 def add_done_callback(self, fn):
356 """Attaches a callable that will be called when the future finishes.
357
358 Args:
359 fn: A callable that will be called with this future as its only
360 argument when the future completes or is cancelled. The callable
361 will always be called by a thread in the same process in which
362 it was added. If the future has already completed or been
363 cancelled then the callable will be called immediately. These
364 callables are called in the order that they were added.
365 """
366 with self._condition:
367 if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
368 self._done_callbacks.append(fn)
369 return
370 fn(self)
371
372 def result(self, timeout=None):
373 """Return the result of the call that the future represents.
374
375 Args:
376 timeout: The number of seconds to wait for the result if the future
377 isn't done. If None, then there is no limit on the wait time.
378
379 Returns:
380 The result of the call that the future represents.
381
382 Raises:
383 CancelledError: If the future was cancelled.
384 TimeoutError: If the future didn't finish executing before the given
385 timeout.
386 Exception: If the call raised then that exception will be raised.
387 """
388 with self._condition:
389 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
390 raise CancelledError()
391 elif self._state == FINISHED:
392 return self.__get_result()
393
394 self._condition.wait(timeout)
395
396 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
397 raise CancelledError()
398 elif self._state == FINISHED:
399 return self.__get_result()
400 else:
401 raise TimeoutError()
402
403 def exception(self, timeout=None):
404 """Return the exception raised by the call that the future represents.
405
406 Args:
407 timeout: The number of seconds to wait for the exception if the
408 future isn't done. If None, then there is no limit on the wait
409 time.
410
411 Returns:
412 The exception raised by the call that the future represents or None
413 if the call completed without raising.
414
415 Raises:
416 CancelledError: If the future was cancelled.
417 TimeoutError: If the future didn't finish executing before the given
418 timeout.
419 """
420
421 with self._condition:
422 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
423 raise CancelledError()
424 elif self._state == FINISHED:
425 return self._exception
426
427 self._condition.wait(timeout)
428
429 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
430 raise CancelledError()
431 elif self._state == FINISHED:
432 return self._exception
433 else:
434 raise TimeoutError()
435
436 # The following methods should only be used by Executors and in tests.
437 def set_running_or_notify_cancel(self):
438 """Mark the future as running or process any cancel notifications.
439
440 Should only be used by Executor implementations and unit tests.
441
442 If the future has been cancelled (cancel() was called and returned
443 True) then any threads waiting on the future completing (though calls
444 to as_completed() or wait()) are notified and False is returned.
445
446 If the future was not cancelled then it is put in the running state
447 (future calls to running() will return True) and True is returned.
448
449 This method should be called by Executor implementations before
450 executing the work associated with this future. If this method returns
451 False then the work should not be executed.
452
453 Returns:
454 False if the Future was cancelled, True otherwise.
455
456 Raises:
457 RuntimeError: if this method was already called or if set_result()
458 or set_exception() was called.
459 """
460 with self._condition:
461 if self._state == CANCELLED:
462 self._state = CANCELLED_AND_NOTIFIED
463 for waiter in self._waiters:
464 waiter.add_cancelled(self)
465 # self._condition.notify_all() is not necessary because
466 # self.cancel() triggers a notification.
467 return False
468 elif self._state == PENDING:
469 self._state = RUNNING
470 return True
471 else:
472 LOGGER.critical('Future %s in unexpected state: %s',
Brian Quinlana26ad5a2012-06-11 12:59:07 +1000473 id(self),
474 self._state)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000475 raise RuntimeError('Future in unexpected state')
476
477 def set_result(self, result):
478 """Sets the return value of work associated with the future.
479
480 Should only be used by Executor implementations and unit tests.
481 """
482 with self._condition:
483 self._result = result
484 self._state = FINISHED
485 for waiter in self._waiters:
486 waiter.add_result(self)
487 self._condition.notify_all()
488 self._invoke_callbacks()
489
490 def set_exception(self, exception):
491 """Sets the result of the future as being the given exception.
492
493 Should only be used by Executor implementations and unit tests.
494 """
495 with self._condition:
496 self._exception = exception
497 self._state = FINISHED
498 for waiter in self._waiters:
499 waiter.add_exception(self)
500 self._condition.notify_all()
501 self._invoke_callbacks()
502
503class Executor(object):
504 """This is an abstract base class for concrete asynchronous executors."""
505
506 def submit(self, fn, *args, **kwargs):
507 """Submits a callable to be executed with the given arguments.
508
509 Schedules the callable to be executed as fn(*args, **kwargs) and returns
510 a Future instance representing the execution of the callable.
511
512 Returns:
513 A Future representing the given call.
514 """
515 raise NotImplementedError()
516
517 def map(self, fn, *iterables, timeout=None):
518 """Returns a iterator equivalent to map(fn, iter).
519
520 Args:
Terry Jan Reedyc30b7b12013-03-11 17:57:08 -0400521 fn: A callable that will take as many arguments as there are
Brian Quinlan81c4d362010-09-18 22:35:02 +0000522 passed iterables.
523 timeout: The maximum number of seconds to wait. If None, then there
524 is no limit on the wait time.
525
526 Returns:
527 An iterator equivalent to: map(func, *iterables) but the calls may
528 be evaluated out-of-order.
529
530 Raises:
531 TimeoutError: If the entire result iterator could not be generated
532 before the given timeout.
533 Exception: If fn(*args) raises for any values.
534 """
535 if timeout is not None:
536 end_time = timeout + time.time()
537
538 fs = [self.submit(fn, *args) for args in zip(*iterables)]
539
Brian Quinlanf0078762011-04-08 08:19:33 +1000540 # Yield must be hidden in closure so that the futures are submitted
541 # before the first iterator value is required.
542 def result_iterator():
543 try:
544 for future in fs:
545 if timeout is None:
546 yield future.result()
547 else:
548 yield future.result(end_time - time.time())
549 finally:
550 for future in fs:
551 future.cancel()
552 return result_iterator()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000553
554 def shutdown(self, wait=True):
555 """Clean-up the resources associated with the Executor.
556
557 It is safe to call this method several times. Otherwise, no other
558 methods can be called after this one.
559
560 Args:
561 wait: If True then shutdown will not return until all running
562 futures have finished executing and the resources used by the
563 executor have been reclaimed.
564 """
565 pass
566
567 def __enter__(self):
568 return self
569
570 def __exit__(self, exc_type, exc_val, exc_tb):
571 self.shutdown(wait=True)
572 return False