blob: 0bbb85bd462033a42599329ab03d549460f3c967 [file] [log] [blame]
Brian Quinlan81c4d362010-09-18 22:35:02 +00001# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4__author__ = 'Brian Quinlan (brian@sweetapp.com)'
5
6import collections
7import functools
8import logging
9import threading
10import time
11
12FIRST_COMPLETED = 'FIRST_COMPLETED'
13FIRST_EXCEPTION = 'FIRST_EXCEPTION'
14ALL_COMPLETED = 'ALL_COMPLETED'
Brian Quinlan3ec60182010-11-17 11:06:29 +000015_AS_COMPLETED = '_AS_COMPLETED'
Brian Quinlan81c4d362010-09-18 22:35:02 +000016
17# Possible future states (for internal use by the futures package).
18PENDING = 'PENDING'
19RUNNING = 'RUNNING'
20# The future was cancelled by the user...
21CANCELLED = 'CANCELLED'
22# ...and _Waiter.add_cancelled() was called by a worker.
23CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
24FINISHED = 'FINISHED'
25
26_FUTURE_STATES = [
27 PENDING,
28 RUNNING,
29 CANCELLED,
30 CANCELLED_AND_NOTIFIED,
31 FINISHED
32]
33
34_STATE_TO_DESCRIPTION_MAP = {
35 PENDING: "pending",
36 RUNNING: "running",
37 CANCELLED: "cancelled",
38 CANCELLED_AND_NOTIFIED: "cancelled",
39 FINISHED: "finished"
40}
41
42# Logger for internal use by the futures package.
43LOGGER = logging.getLogger("concurrent.futures")
Brian Quinlan1e2ae4f2010-10-06 13:05:45 +000044STDERR_HANDLER = logging.StreamHandler()
45LOGGER.addHandler(STDERR_HANDLER)
Brian Quinlan81c4d362010-09-18 22:35:02 +000046
47class Error(Exception):
48 """Base class for all future-related exceptions."""
49 pass
50
51class CancelledError(Error):
52 """The Future was cancelled."""
53 pass
54
55class TimeoutError(Error):
56 """The operation exceeded the given deadline."""
57 pass
58
59class _Waiter(object):
60 """Provides the event that wait() and as_completed() block on."""
61 def __init__(self):
62 self.event = threading.Event()
63 self.finished_futures = []
64
65 def add_result(self, future):
66 self.finished_futures.append(future)
67
68 def add_exception(self, future):
69 self.finished_futures.append(future)
70
71 def add_cancelled(self, future):
72 self.finished_futures.append(future)
73
Brian Quinlan3ec60182010-11-17 11:06:29 +000074class _AsCompletedWaiter(_Waiter):
75 """Used by as_completed()."""
76
77 def __init__(self):
78 super(_AsCompletedWaiter, self).__init__()
79 self.lock = threading.Lock()
80
81 def add_result(self, future):
82 with self.lock:
83 super(_AsCompletedWaiter, self).add_result(future)
84 self.event.set()
85
86 def add_exception(self, future):
87 with self.lock:
88 super(_AsCompletedWaiter, self).add_exception(future)
89 self.event.set()
90
91 def add_cancelled(self, future):
92 with self.lock:
93 super(_AsCompletedWaiter, self).add_cancelled(future)
94 self.event.set()
95
Brian Quinlan81c4d362010-09-18 22:35:02 +000096class _FirstCompletedWaiter(_Waiter):
Brian Quinlan3ec60182010-11-17 11:06:29 +000097 """Used by wait(return_when=FIRST_COMPLETED)."""
Brian Quinlan81c4d362010-09-18 22:35:02 +000098
99 def add_result(self, future):
100 super().add_result(future)
101 self.event.set()
102
103 def add_exception(self, future):
104 super().add_exception(future)
105 self.event.set()
106
107 def add_cancelled(self, future):
108 super().add_cancelled(future)
109 self.event.set()
110
111class _AllCompletedWaiter(_Waiter):
112 """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""
113
114 def __init__(self, num_pending_calls, stop_on_exception):
115 self.num_pending_calls = num_pending_calls
116 self.stop_on_exception = stop_on_exception
117 super().__init__()
118
119 def _decrement_pending_calls(self):
120 self.num_pending_calls -= 1
121 if not self.num_pending_calls:
122 self.event.set()
123
124 def add_result(self, future):
125 super().add_result(future)
126 self._decrement_pending_calls()
127
128 def add_exception(self, future):
129 super().add_exception(future)
130 if self.stop_on_exception:
131 self.event.set()
132 else:
133 self._decrement_pending_calls()
134
135 def add_cancelled(self, future):
136 super().add_cancelled(future)
137 self._decrement_pending_calls()
138
139class _AcquireFutures(object):
140 """A context manager that does an ordered acquire of Future conditions."""
141
142 def __init__(self, futures):
143 self.futures = sorted(futures, key=id)
144
145 def __enter__(self):
146 for future in self.futures:
147 future._condition.acquire()
148
149 def __exit__(self, *args):
150 for future in self.futures:
151 future._condition.release()
152
153def _create_and_install_waiters(fs, return_when):
Brian Quinlan3ec60182010-11-17 11:06:29 +0000154 if return_when == _AS_COMPLETED:
155 waiter = _AsCompletedWaiter()
156 elif return_when == FIRST_COMPLETED:
Brian Quinlan81c4d362010-09-18 22:35:02 +0000157 waiter = _FirstCompletedWaiter()
158 else:
159 pending_count = sum(
160 f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)
161
162 if return_when == FIRST_EXCEPTION:
163 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True)
164 elif return_when == ALL_COMPLETED:
165 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False)
166 else:
167 raise ValueError("Invalid return condition: %r" % return_when)
168
169 for f in fs:
170 f._waiters.append(waiter)
171
172 return waiter
173
174def as_completed(fs, timeout=None):
175 """An iterator over the given futures that yields each as it completes.
176
177 Args:
178 fs: The sequence of Futures (possibly created by different Executors) to
179 iterate over.
180 timeout: The maximum number of seconds to wait. If None, then there
181 is no limit on the wait time.
182
183 Returns:
184 An iterator that yields the given Futures as they complete (finished or
185 cancelled).
186
187 Raises:
188 TimeoutError: If the entire result iterator could not be generated
189 before the given timeout.
190 """
191 if timeout is not None:
192 end_time = timeout + time.time()
193
194 with _AcquireFutures(fs):
195 finished = set(
196 f for f in fs
197 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
198 pending = set(fs) - finished
Brian Quinlan3ec60182010-11-17 11:06:29 +0000199 waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000200
201 try:
202 for future in finished:
203 yield future
204
205 while pending:
206 if timeout is None:
207 wait_timeout = None
208 else:
209 wait_timeout = end_time - time.time()
210 if wait_timeout < 0:
211 raise TimeoutError(
212 '%d (of %d) futures unfinished' % (
213 len(pending), len(fs)))
214
Brian Quinlan3ec60182010-11-17 11:06:29 +0000215 waiter.event.wait(wait_timeout)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000216
Brian Quinlan3ec60182010-11-17 11:06:29 +0000217 with waiter.lock:
218 finished = waiter.finished_futures
219 waiter.finished_futures = []
220 waiter.event.clear()
221
222 for future in finished:
Brian Quinlan81c4d362010-09-18 22:35:02 +0000223 yield future
Brian Quinlan81c4d362010-09-18 22:35:02 +0000224 pending.remove(future)
225
226 finally:
227 for f in fs:
228 f._waiters.remove(waiter)
229
230DoneAndNotDoneFutures = collections.namedtuple(
231 'DoneAndNotDoneFutures', 'done not_done')
232def wait(fs, timeout=None, return_when=ALL_COMPLETED):
233 """Wait for the futures in the given sequence to complete.
234
235 Args:
236 fs: The sequence of Futures (possibly created by different Executors) to
237 wait upon.
238 timeout: The maximum number of seconds to wait. If None, then there
239 is no limit on the wait time.
240 return_when: Indicates when this function should return. The options
241 are:
242
243 FIRST_COMPLETED - Return when any future finishes or is
244 cancelled.
245 FIRST_EXCEPTION - Return when any future finishes by raising an
246 exception. If no future raises an exception
247 then it is equivalent to ALL_COMPLETED.
248 ALL_COMPLETED - Return when all futures finish or are cancelled.
249
250 Returns:
251 A named 2-tuple of sets. The first set, named 'done', contains the
252 futures that completed (is finished or cancelled) before the wait
253 completed. The second set, named 'not_done', contains uncompleted
254 futures.
255 """
256 with _AcquireFutures(fs):
257 done = set(f for f in fs
258 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
259 not_done = set(fs) - done
260
261 if (return_when == FIRST_COMPLETED) and done:
262 return DoneAndNotDoneFutures(done, not_done)
263 elif (return_when == FIRST_EXCEPTION) and done:
264 if any(f for f in done
265 if not f.cancelled() and f.exception() is not None):
266 return DoneAndNotDoneFutures(done, not_done)
267
268 if len(done) == len(fs):
269 return DoneAndNotDoneFutures(done, not_done)
270
271 waiter = _create_and_install_waiters(fs, return_when)
272
273 waiter.event.wait(timeout)
274 for f in fs:
275 f._waiters.remove(waiter)
276
277 done.update(waiter.finished_futures)
278 return DoneAndNotDoneFutures(done, set(fs) - done)
279
280class Future(object):
281 """Represents the result of an asynchronous computation."""
282
283 def __init__(self):
284 """Initializes the future. Should not be called by clients."""
285 self._condition = threading.Condition()
286 self._state = PENDING
287 self._result = None
288 self._exception = None
289 self._waiters = []
290 self._done_callbacks = []
291
292 def _invoke_callbacks(self):
293 for callback in self._done_callbacks:
294 try:
295 callback(self)
296 except Exception:
297 LOGGER.exception('exception calling callback for %r', self)
298
299 def __repr__(self):
300 with self._condition:
301 if self._state == FINISHED:
302 if self._exception:
303 return '<Future at %s state=%s raised %s>' % (
304 hex(id(self)),
305 _STATE_TO_DESCRIPTION_MAP[self._state],
306 self._exception.__class__.__name__)
307 else:
308 return '<Future at %s state=%s returned %s>' % (
309 hex(id(self)),
310 _STATE_TO_DESCRIPTION_MAP[self._state],
311 self._result.__class__.__name__)
312 return '<Future at %s state=%s>' % (
313 hex(id(self)),
314 _STATE_TO_DESCRIPTION_MAP[self._state])
315
316 def cancel(self):
317 """Cancel the future if possible.
318
319 Returns True if the future was cancelled, False otherwise. A future
320 cannot be cancelled if it is running or has already completed.
321 """
322 with self._condition:
323 if self._state in [RUNNING, FINISHED]:
324 return False
325
326 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
327 return True
328
329 self._state = CANCELLED
330 self._condition.notify_all()
331
332 self._invoke_callbacks()
333 return True
334
335 def cancelled(self):
336 """Return True if the future has cancelled."""
337 with self._condition:
338 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
339
340 def running(self):
341 """Return True if the future is currently executing."""
342 with self._condition:
343 return self._state == RUNNING
344
345 def done(self):
346 """Return True of the future was cancelled or finished executing."""
347 with self._condition:
348 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
349
350 def __get_result(self):
351 if self._exception:
352 raise self._exception
353 else:
354 return self._result
355
356 def add_done_callback(self, fn):
357 """Attaches a callable that will be called when the future finishes.
358
359 Args:
360 fn: A callable that will be called with this future as its only
361 argument when the future completes or is cancelled. The callable
362 will always be called by a thread in the same process in which
363 it was added. If the future has already completed or been
364 cancelled then the callable will be called immediately. These
365 callables are called in the order that they were added.
366 """
367 with self._condition:
368 if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
369 self._done_callbacks.append(fn)
370 return
371 fn(self)
372
373 def result(self, timeout=None):
374 """Return the result of the call that the future represents.
375
376 Args:
377 timeout: The number of seconds to wait for the result if the future
378 isn't done. If None, then there is no limit on the wait time.
379
380 Returns:
381 The result of the call that the future represents.
382
383 Raises:
384 CancelledError: If the future was cancelled.
385 TimeoutError: If the future didn't finish executing before the given
386 timeout.
387 Exception: If the call raised then that exception will be raised.
388 """
389 with self._condition:
390 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
391 raise CancelledError()
392 elif self._state == FINISHED:
393 return self.__get_result()
394
395 self._condition.wait(timeout)
396
397 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
398 raise CancelledError()
399 elif self._state == FINISHED:
400 return self.__get_result()
401 else:
402 raise TimeoutError()
403
404 def exception(self, timeout=None):
405 """Return the exception raised by the call that the future represents.
406
407 Args:
408 timeout: The number of seconds to wait for the exception if the
409 future isn't done. If None, then there is no limit on the wait
410 time.
411
412 Returns:
413 The exception raised by the call that the future represents or None
414 if the call completed without raising.
415
416 Raises:
417 CancelledError: If the future was cancelled.
418 TimeoutError: If the future didn't finish executing before the given
419 timeout.
420 """
421
422 with self._condition:
423 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
424 raise CancelledError()
425 elif self._state == FINISHED:
426 return self._exception
427
428 self._condition.wait(timeout)
429
430 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
431 raise CancelledError()
432 elif self._state == FINISHED:
433 return self._exception
434 else:
435 raise TimeoutError()
436
437 # The following methods should only be used by Executors and in tests.
438 def set_running_or_notify_cancel(self):
439 """Mark the future as running or process any cancel notifications.
440
441 Should only be used by Executor implementations and unit tests.
442
443 If the future has been cancelled (cancel() was called and returned
444 True) then any threads waiting on the future completing (though calls
445 to as_completed() or wait()) are notified and False is returned.
446
447 If the future was not cancelled then it is put in the running state
448 (future calls to running() will return True) and True is returned.
449
450 This method should be called by Executor implementations before
451 executing the work associated with this future. If this method returns
452 False then the work should not be executed.
453
454 Returns:
455 False if the Future was cancelled, True otherwise.
456
457 Raises:
458 RuntimeError: if this method was already called or if set_result()
459 or set_exception() was called.
460 """
461 with self._condition:
462 if self._state == CANCELLED:
463 self._state = CANCELLED_AND_NOTIFIED
464 for waiter in self._waiters:
465 waiter.add_cancelled(self)
466 # self._condition.notify_all() is not necessary because
467 # self.cancel() triggers a notification.
468 return False
469 elif self._state == PENDING:
470 self._state = RUNNING
471 return True
472 else:
473 LOGGER.critical('Future %s in unexpected state: %s',
474 id(self.future),
475 self.future._state)
476 raise RuntimeError('Future in unexpected state')
477
478 def set_result(self, result):
479 """Sets the return value of work associated with the future.
480
481 Should only be used by Executor implementations and unit tests.
482 """
483 with self._condition:
484 self._result = result
485 self._state = FINISHED
486 for waiter in self._waiters:
487 waiter.add_result(self)
488 self._condition.notify_all()
489 self._invoke_callbacks()
490
491 def set_exception(self, exception):
492 """Sets the result of the future as being the given exception.
493
494 Should only be used by Executor implementations and unit tests.
495 """
496 with self._condition:
497 self._exception = exception
498 self._state = FINISHED
499 for waiter in self._waiters:
500 waiter.add_exception(self)
501 self._condition.notify_all()
502 self._invoke_callbacks()
503
504class Executor(object):
505 """This is an abstract base class for concrete asynchronous executors."""
506
507 def submit(self, fn, *args, **kwargs):
508 """Submits a callable to be executed with the given arguments.
509
510 Schedules the callable to be executed as fn(*args, **kwargs) and returns
511 a Future instance representing the execution of the callable.
512
513 Returns:
514 A Future representing the given call.
515 """
516 raise NotImplementedError()
517
518 def map(self, fn, *iterables, timeout=None):
519 """Returns a iterator equivalent to map(fn, iter).
520
521 Args:
522 fn: A callable that will take take as many arguments as there are
523 passed iterables.
524 timeout: The maximum number of seconds to wait. If None, then there
525 is no limit on the wait time.
526
527 Returns:
528 An iterator equivalent to: map(func, *iterables) but the calls may
529 be evaluated out-of-order.
530
531 Raises:
532 TimeoutError: If the entire result iterator could not be generated
533 before the given timeout.
534 Exception: If fn(*args) raises for any values.
535 """
536 if timeout is not None:
537 end_time = timeout + time.time()
538
539 fs = [self.submit(fn, *args) for args in zip(*iterables)]
540
541 try:
542 for future in fs:
543 if timeout is None:
544 yield future.result()
545 else:
546 yield future.result(end_time - time.time())
547 finally:
548 for future in fs:
549 future.cancel()
550
551 def shutdown(self, wait=True):
552 """Clean-up the resources associated with the Executor.
553
554 It is safe to call this method several times. Otherwise, no other
555 methods can be called after this one.
556
557 Args:
558 wait: If True then shutdown will not return until all running
559 futures have finished executing and the resources used by the
560 executor have been reclaimed.
561 """
562 pass
563
564 def __enter__(self):
565 return self
566
567 def __exit__(self, exc_type, exc_val, exc_tb):
568 self.shutdown(wait=True)
569 return False