blob: abb3dc5c321d3b4bc9f1b65192f2ec11994ec54d [file] [log] [blame]
Brian Quinlan81c4d362010-09-18 22:35:02 +00001:mod:`concurrent.futures` --- Concurrent computation
2====================================================
3
4.. module:: concurrent.futures
5 :synopsis: Execute computations concurrently using threads or processes.
6
7The :mod:`concurrent.futures` module provides a high-level interface for
8asynchronously executing callables.
9
10The asynchronous execution can be be performed by threads using
11:class:`ThreadPoolExecutor` or seperate processes using
12:class:`ProcessPoolExecutor`. Both implement the same interface, which is
13defined by the abstract :class:`Executor` class.
14
15Executor Objects
16^^^^^^^^^^^^^^^^
17
18:class:`Executor` is an abstract class that provides methods to execute calls
19asynchronously. It should not be used directly, but through its two
20subclasses: :class:`ThreadPoolExecutor` and :class:`ProcessPoolExecutor`.
21
22.. class:: Executor()
23
24 An abstract class that provides methods to execute calls asynchronously. It
25 should not be used directly, but through its two subclasses:
26 :class:`ThreadPoolExecutor` and :class:`ProcessPoolExecutor`.
27
28 .. method:: submit(fn, *args, **kwargs)
29
30 Schedules the callable to be executed as *fn*(*\*args*, *\*\*kwargs*) and
31 returns a :class:`Future` representing the execution of the callable.
32
33 ::
34
35 with ThreadPoolExecutor(max_workers=1) as executor:
36 future = executor.submit(pow, 323, 1235)
37 print(future.result())
38
39 .. method:: map(func, *iterables, timeout=None)
40
41 Equivalent to `map(*func*, *\*iterables*)` but func is executed
42 asynchronously and several calls to *func* may be made concurrently. The
43 returned iterator raises a :exc:`TimeoutError` if :meth:`__next__()` is
44 called and the result isn't available after *timeout* seconds from the
45 original call to :meth:`Executor.map()`. *timeout* can be an int or
46 float. If *timeout* is not specified or ``None`` then there is no limit
47 to the wait time. If a call raises an exception then that exception will
48 be raised when its value is retrieved from the iterator.
49
50 .. method:: shutdown(wait=True)
51
52 Signal the executor that it should free any resources that it is using
53 when the currently pending futures are done executing. Calls to
54 :meth:`Executor.submit` and :meth:`Executor.map` made after shutdown will
55 raise :exc:`RuntimeError`.
56
57 If *wait* is `True` then this method will not return until all the
58 pending futures are done executing and the resources associated with the
59 executor have been freed. If *wait* is `False` then this method will
60 return immediately and the resources associated with the executor will
61 be freed when all pending futures are done executing. Regardless of the
62 value of *wait*, the entire Python program will not exit until all
63 pending futures are done executing.
64
65 You can avoid having to call this method explicitly if you use the `with`
66 statement, which will shutdown the `Executor` (waiting as if
67 `Executor.shutdown` were called with *wait* set to `True`):
68
69 ::
70
71 import shutil
72 with ThreadPoolExecutor(max_workers=4) as e:
73 e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
74 e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
75 e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
76 e.submit(shutil.copy, 'src3.txt', 'dest4.txt')
77
78ThreadPoolExecutor
79^^^^^^^^^^^^^^^^^^
80
81The :class:`ThreadPoolExecutor` class is an :class:`Executor` subclass that uses
82a pool of threads to execute calls asynchronously.
83
84Deadlock can occur when the callable associated with a :class:`Future` waits on
85the results of another :class:`Future`. For example:
86
87::
88
89 import time
90 def wait_on_b():
91 time.sleep(5)
92 print(b.result()) # b will never complete because it is waiting on a.
93 return 5
94
95 def wait_on_a():
96 time.sleep(5)
97 print(a.result()) # a will never complete because it is waiting on b.
98 return 6
99
100
101 executor = ThreadPoolExecutor(max_workers=2)
102 a = executor.submit(wait_on_b)
103 b = executor.submit(wait_on_a)
104
105And:
106
107::
108
109 def wait_on_future():
110 f = executor.submit(pow, 5, 2)
111 # This will never complete because there is only one worker thread and
112 # it is executing this function.
113 print(f.result())
114
115 executor = ThreadPoolExecutor(max_workers=1)
116 executor.submit(wait_on_future)
117
118
119.. class:: ThreadPoolExecutor(max_workers)
120
121 An :class:`Executor` subclass that uses a pool of at most *max_workers*
122 threads to execute calls asynchronously.
123
124 Deadlock can occur when the callable associated with a :class:`Future` waits
125 on the results of another :class:`Future`.
126
127.. _threadpoolexecutor-example:
128
129ThreadPoolExecutor Example
130^^^^^^^^^^^^^^^^^^^^^^^^^^
131::
132
133 import concurrent.futures
134 import urllib.request
135
136 URLS = ['http://www.foxnews.com/',
137 'http://www.cnn.com/',
138 'http://europe.wsj.com/',
139 'http://www.bbc.co.uk/',
140 'http://some-made-up-domain.com/']
141
142 def load_url(url, timeout):
143 return urllib.request.urlopen(url, timeout=timeout).read()
144
145 with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
146 future_to_url = dict((executor.submit(load_url, url, 60), url)
147 for url in URLS)
148
149 for future in concurrent.futures.as_completed(future_to_url):
150 url = future_to_url[future]
151 if future.exception() is not None:
152 print('%r generated an exception: %s' % (url,
153 future.exception()))
154 else:
155 print('%r page is %d bytes' % (url, len(future.result())))
156
157
158ProcessPoolExecutor
159^^^^^^^^^^^^^^^^^^^
160
161The :class:`ProcessPoolExecutor` class is an :class:`Executor` subclass that
162uses a pool of processes to execute calls asynchronously.
163:class:`ProcessPoolExecutor` uses the :mod:`multiprocessing` module, which
164allows it to side-step the :term:`Global Interpreter Lock` but also means that
165only picklable objects can be executed and returned.
166
167Calling :class:`Executor` or :class:`Future` methods from a callable submitted
168to a :class:`ProcessPoolExecutor` will result in deadlock.
169
170.. class:: ProcessPoolExecutor(max_workers=None)
171
172 An :class:`Executor` subclass that executes calls asynchronously using a
173 pool of at most *max_workers* processes. If *max_workers* is ``None`` or
174 not given then as many worker processes will be created as the machine has
175 processors.
176
177.. _processpoolexecutor-example:
178
179ProcessPoolExecutor Example
180^^^^^^^^^^^^^^^^^^^^^^^^^^^
181::
182
183 import concurrent.futures
184 import math
185
186 PRIMES = [
187 112272535095293,
188 112582705942171,
189 112272535095293,
190 115280095190773,
191 115797848077099,
192 1099726899285419]
193
194 def is_prime(n):
195 if n % 2 == 0:
196 return False
197
198 sqrt_n = int(math.floor(math.sqrt(n)))
199 for i in range(3, sqrt_n + 1, 2):
200 if n % i == 0:
201 return False
202 return True
203
204 def main():
205 with concurrent.futures.ProcessPoolExecutor() as executor:
206 for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
207 print('%d is prime: %s' % (number, prime))
208
209 if __name__ == '__main__':
210 main()
211
212Future Objects
213^^^^^^^^^^^^^^
214
215The :class:`Future` class encapulates the asynchronous execution of a callable.
216:class:`Future` instances are created by :meth:`Executor.submit`.
217
218.. class:: Future()
219
220 Encapulates the asynchronous execution of a callable. :class:`Future`
221 instances are created by :meth:`Executor.submit` and should not be created
222 directly except for testing.
223
224 .. method:: cancel()
225
226 Attempt to cancel the call. If the call is currently being executed then
227 it cannot be cancelled and the method will return `False`, otherwise the
228 call will be cancelled and the method will return `True`.
229
230 .. method:: cancelled()
231
232 Return `True` if the call was successfully cancelled.
233
234 .. method:: running()
235
236 Return `True` if the call is currently being executed and cannot be
237 cancelled.
238
239 .. method:: done()
240
241 Return `True` if the call was successfully cancelled or finished running.
242
243 .. method:: result(timeout=None)
244
245 Return the value returned by the call. If the call hasn't yet completed
246 then this method will wait up to *timeout* seconds. If the call hasn't
247 completed in *timeout* seconds then a :exc:`TimeoutError` will be
248 raised. *timeout* can be an int or float.If *timeout* is not specified
249 or ``None`` then there is no limit to the wait time.
250
251 If the future is cancelled before completing then :exc:`CancelledError`
252 will be raised.
253
254 If the call raised then this method will raise the same exception.
255
256 .. method:: exception(timeout=None)
257
258 Return the exception raised by the call. If the call hasn't yet completed
259 then this method will wait up to *timeout* seconds. If the call hasn't
260 completed in *timeout* seconds then a :exc:`TimeoutError` will be raised.
261 *timeout* can be an int or float. If *timeout* is not specified or
262 ``None`` then there is no limit to the wait time.
263
264 If the future is cancelled before completing then :exc:`CancelledError`
265 will be raised.
266
267 If the call completed without raising then ``None`` is returned.
268
269 .. method:: add_done_callback(fn)
270
271 Attaches the callable *fn* to the future. *fn* will be called, with the
272 future as its only argument, when the future is cancelled or finishes
273 running.
274
275 Added callables are called in the order that they were added and are
276 always called in a thread belonging to the process that added them. If
277 the callable raises an :exc:`Exception` then it will be logged and
278 ignored. If the callable raises another :exc:`BaseException` then the
279 behavior is not defined.
280
281 If the future has already completed or been cancelled then *fn* will be
282 called immediately.
283
284 The following :class:`Future` methods are meant for use in unit tests and
285 :class:`Executor` implementations.
286
287 .. method:: set_running_or_notify_cancel()
288
289 This method should only be called by :class:`Executor` implementations
290 before executing the work associated with the :class:`Future` and by
291 unit tests.
292
293 If the method returns `False` then the :class:`Future` was cancelled i.e.
294 :meth:`Future.cancel` was called and returned `True`. Any threads waiting
295 on the :class:`Future` completing (i.e. through :func:`as_completed` or
296 :func:`wait`) will be woken up.
297
298 If the method returns `True` then the :class:`Future` was not cancelled
299 and has been put in the running state i.e. calls to
300 :meth:`Future.running` will return `True`.
301
302 This method can only be called once and cannot be called after
303 :meth:`Future.set_result` or :meth:`Future.set_exception` have been
304 called.
305
306 .. method:: set_result(result)
307
308 Sets the result of the work associated with the :class:`Future` to
309 *result*.
310
311 This method should only be used by :class:`Executor` implementations and
312 unit tests.
313
314 .. method:: set_exception(exception)
315
316 Sets the result of the work associated with the :class:`Future` to the
317 :class:`Exception` *exception*.
318
319 This method should only be used by :class:`Executor` implementations and
320 unit tests.
321
322
323Module Functions
324^^^^^^^^^^^^^^^^
325
326.. function:: wait(fs, timeout=None, return_when=ALL_COMPLETED)
327
328 Wait for the :class:`Future` instances (possibly created by different
329 :class:`Executor` instances) given by *fs* to complete. Returns a named
330 2-tuple of sets. The first set, named "done", contains the futures that
331 completed (finished or were cancelled) before the wait completed. The second
332 set, named "not_done", contains uncompleted futures.
333
334 *timeout* can be used to control the maximum number of seconds to wait before
335 returning. *timeout* can be an int or float. If *timeout* is not specified or
336 ``None`` then there is no limit to the wait time.
337
338 *return_when* indicates when this function should return. It must be one of
339 the following constants:
340
341 +-----------------------------+----------------------------------------+
342 | Constant | Description |
343 +=============================+========================================+
344 | :const:`FIRST_COMPLETED` | The function will return when any |
345 | | future finishes or is cancelled. |
346 +-----------------------------+----------------------------------------+
347 | :const:`FIRST_EXCEPTION` | The function will return when any |
348 | | future finishes by raising an |
349 | | exception. If no future raises an |
350 | | exception then it is equivalent to |
351 | | `ALL_COMPLETED`. |
352 +-----------------------------+----------------------------------------+
353 | :const:`ALL_COMPLETED` | The function will return when all |
354 | | futures finish or are cancelled. |
355 +-----------------------------+----------------------------------------+
356
357.. function:: as_completed(fs, timeout=None)
358
359 Returns an iterator over the :class:`Future` instances (possibly created
360 by different :class:`Executor` instances) given by *fs* that yields futures
361 as they complete (finished or were cancelled). Any futures that completed
362 before :func:`as_completed()` was called will be yielded first. The returned
363 iterator raises a :exc:`TimeoutError` if :meth:`__next__()` is called and
364 the result isn't available after *timeout* seconds from the original call
365 to :func:`as_completed()`. *timeout* can be an int or float. If *timeout*
366 is not specified or ``None`` then there is no limit to the wait time.