blob: 67ebbf515215cd311a061032618f50a9a165746d [file] [log] [blame]
Brian Quinlan81c4d362010-09-18 22:35:02 +00001# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4"""Implements ProcessPoolExecutor.
5
6The follow diagram and text describe the data-flow through the system:
7
8|======================= In-process =====================|== Out-of-process ==|
9
10+----------+ +----------+ +--------+ +-----------+ +---------+
11| | => | Work Ids | => | | => | Call Q | => | |
12| | +----------+ | | +-----------+ | |
13| | | ... | | | | ... | | |
14| | | 6 | | | | 5, call() | | |
15| | | 7 | | | | ... | | |
16| Process | | ... | | Local | +-----------+ | Process |
17| Pool | +----------+ | Worker | | #1..n |
18| Executor | | Thread | | |
19| | +----------- + | | +-----------+ | |
20| | <=> | Work Items | <=> | | <= | Result Q | <= | |
21| | +------------+ | | +-----------+ | |
22| | | 6: call() | | | | ... | | |
23| | | future | | | | 4, result | | |
24| | | ... | | | | 3, except | | |
25+----------+ +------------+ +--------+ +-----------+ +---------+
26
27Executor.submit() called:
28- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29- adds the id of the _WorkItem to the "Work Ids" queue
30
31Local worker thread:
32- reads work ids from the "Work Ids" queue and looks up the corresponding
33 WorkItem from the "Work Items" dict: if the work item has been cancelled then
34 it is simply removed from the dict, otherwise it is repackaged as a
35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38- reads _ResultItems from "Result Q", updates the future stored in the
39 "Work Items" dict and deletes the dict entry
40
41Process #1..n:
42- reads _CallItems from "Call Q", executes the calls, and puts the resulting
Mark Dickinson5ee24042012-10-20 13:16:49 +010043 _ResultItems in "Result Q"
Brian Quinlan81c4d362010-09-18 22:35:02 +000044"""
45
46__author__ = 'Brian Quinlan (brian@sweetapp.com)'
47
48import atexit
Antoine Pitroudd696492011-06-08 17:21:55 +020049import os
Brian Quinlan81c4d362010-09-18 22:35:02 +000050from concurrent.futures import _base
51import queue
Richard Oudkerk1f2eaa92013-10-16 17:06:22 +010052from queue import Full
Thomas Moreaue8c368d2017-10-03 11:53:17 +020053import multiprocessing as mp
Antoine Pitroubdb1cf12012-03-05 19:28:37 +010054from multiprocessing.connection import wait
Brian Quinlan81c4d362010-09-18 22:35:02 +000055import threading
56import weakref
Antoine Pitrou4aae2762014-10-04 20:20:10 +020057from functools import partial
58import itertools
Antoine Pitrou1285c9b2015-01-17 20:02:14 +010059import traceback
Brian Quinlan81c4d362010-09-18 22:35:02 +000060
61# Workers are created as daemon threads and processes. This is done to allow the
62# interpreter to exit when there are still idle processes in a
63# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
64# allowing workers to die with the interpreter has two undesirable properties:
Raymond Hettinger15f44ab2016-08-30 10:47:49 -070065# - The workers would still be running during interpreter shutdown,
Brian Quinlan81c4d362010-09-18 22:35:02 +000066# meaning that they would fail in unpredictable ways.
67# - The workers could be killed while evaluating a work item, which could
68# be bad if the callable being evaluated has external side-effects e.g.
69# writing to a file.
70#
71# To work around this problem, an exit handler is installed which tells the
72# workers to exit when their work queues are empty and then waits until the
73# threads/processes finish.
74
Antoine Pitrouc13d4542011-03-26 19:29:44 +010075_threads_queues = weakref.WeakKeyDictionary()
Thomas Moreaue8c368d2017-10-03 11:53:17 +020076_global_shutdown = False
Brian Quinlan81c4d362010-09-18 22:35:02 +000077
78def _python_exit():
Thomas Moreaue8c368d2017-10-03 11:53:17 +020079 global _global_shutdown
80 _global_shutdown = True
Antoine Pitrouc13d4542011-03-26 19:29:44 +010081 items = list(_threads_queues.items())
82 for t, q in items:
83 q.put(None)
84 for t, q in items:
85 t.join()
Brian Quinlan81c4d362010-09-18 22:35:02 +000086
87# Controls how many more calls than processes will be queued in the call queue.
88# A smaller number will mean that processes spend more time idle waiting for
89# work while a larger number will make Future.cancel() succeed less frequently
90# (Futures in the call queue cannot be cancelled).
91EXTRA_QUEUED_CALLS = 1
92
Antoine Pitrou1285c9b2015-01-17 20:02:14 +010093# Hack to embed stringification of remote traceback in local traceback
94
95class _RemoteTraceback(Exception):
96 def __init__(self, tb):
97 self.tb = tb
98 def __str__(self):
99 return self.tb
100
101class _ExceptionWithTraceback:
102 def __init__(self, exc, tb):
103 tb = traceback.format_exception(type(exc), exc, tb)
104 tb = ''.join(tb)
105 self.exc = exc
106 self.tb = '\n"""\n%s"""' % tb
107 def __reduce__(self):
108 return _rebuild_exc, (self.exc, self.tb)
109
110def _rebuild_exc(exc, tb):
111 exc.__cause__ = _RemoteTraceback(tb)
112 return exc
113
Brian Quinlan81c4d362010-09-18 22:35:02 +0000114class _WorkItem(object):
115 def __init__(self, future, fn, args, kwargs):
116 self.future = future
117 self.fn = fn
118 self.args = args
119 self.kwargs = kwargs
120
121class _ResultItem(object):
122 def __init__(self, work_id, exception=None, result=None):
123 self.work_id = work_id
124 self.exception = exception
125 self.result = result
126
127class _CallItem(object):
128 def __init__(self, work_id, fn, args, kwargs):
129 self.work_id = work_id
130 self.fn = fn
131 self.args = args
132 self.kwargs = kwargs
133
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200134def _get_chunks(*iterables, chunksize):
135 """ Iterates over zip()ed iterables in chunks. """
136 it = zip(*iterables)
137 while True:
138 chunk = tuple(itertools.islice(it, chunksize))
139 if not chunk:
140 return
141 yield chunk
142
143def _process_chunk(fn, chunk):
144 """ Processes a chunk of an iterable passed to map.
145
146 Runs the function passed to map() on a chunk of the
147 iterable passed to map.
148
149 This function is run in a separate process.
150
151 """
152 return [fn(*args) for args in chunk]
153
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200154def _process_worker(call_queue, result_queue):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000155 """Evaluates calls from call_queue and places the results in result_queue.
156
Georg Brandlfb1720b2010-12-09 18:08:43 +0000157 This worker is run in a separate process.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000158
159 Args:
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200160 call_queue: A ctx.Queue of _CallItems that will be read and
Brian Quinlan81c4d362010-09-18 22:35:02 +0000161 evaluated by the worker.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200162 result_queue: A ctx.Queue of _ResultItems that will written
Brian Quinlan81c4d362010-09-18 22:35:02 +0000163 to by the worker.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000164 """
165 while True:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200166 call_item = call_queue.get(block=True)
167 if call_item is None:
168 # Wake up queue management thread
Antoine Pitroudd696492011-06-08 17:21:55 +0200169 result_queue.put(os.getpid())
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200170 return
Brian Quinlan81c4d362010-09-18 22:35:02 +0000171 try:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200172 r = call_item.fn(*call_item.args, **call_item.kwargs)
173 except BaseException as e:
Antoine Pitrou1285c9b2015-01-17 20:02:14 +0100174 exc = _ExceptionWithTraceback(e, e.__traceback__)
175 result_queue.put(_ResultItem(call_item.work_id, exception=exc))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000176 else:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200177 result_queue.put(_ResultItem(call_item.work_id,
178 result=r))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000179
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200180 # Liberate the resource as soon as possible, to avoid holding onto
181 # open files or shared memory that is not needed anymore
182 del call_item
183
184
Brian Quinlan81c4d362010-09-18 22:35:02 +0000185def _add_call_item_to_queue(pending_work_items,
186 work_ids,
187 call_queue):
188 """Fills call_queue with _WorkItems from pending_work_items.
189
190 This function never blocks.
191
192 Args:
193 pending_work_items: A dict mapping work ids to _WorkItems e.g.
194 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
195 work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
196 are consumed and the corresponding _WorkItems from
197 pending_work_items are transformed into _CallItems and put in
198 call_queue.
199 call_queue: A multiprocessing.Queue that will be filled with _CallItems
200 derived from _WorkItems.
201 """
202 while True:
203 if call_queue.full():
204 return
205 try:
206 work_id = work_ids.get(block=False)
207 except queue.Empty:
208 return
209 else:
210 work_item = pending_work_items[work_id]
211
212 if work_item.future.set_running_or_notify_cancel():
213 call_queue.put(_CallItem(work_id,
214 work_item.fn,
215 work_item.args,
216 work_item.kwargs),
217 block=True)
218 else:
219 del pending_work_items[work_id]
220 continue
221
Antoine Pitroub87a56a2011-05-03 16:34:42 +0200222def _queue_management_worker(executor_reference,
223 processes,
224 pending_work_items,
225 work_ids_queue,
226 call_queue,
227 result_queue):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000228 """Manages the communication between this process and the worker processes.
229
230 This function is run in a local thread.
231
232 Args:
233 executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
234 this thread. Used to determine if the ProcessPoolExecutor has been
235 garbage collected and that this function can exit.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200236 process: A list of the ctx.Process instances used as
Brian Quinlan81c4d362010-09-18 22:35:02 +0000237 workers.
238 pending_work_items: A dict mapping work ids to _WorkItems e.g.
239 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
240 work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200241 call_queue: A ctx.Queue that will be filled with _CallItems
Brian Quinlan81c4d362010-09-18 22:35:02 +0000242 derived from _WorkItems for processing by the process workers.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200243 result_queue: A ctx.SimpleQueue of _ResultItems generated by the
Brian Quinlan81c4d362010-09-18 22:35:02 +0000244 process workers.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000245 """
Antoine Pitrou020436b2011-07-02 21:20:25 +0200246 executor = None
247
248 def shutting_down():
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200249 return (_global_shutdown or executor is None
250 or executor._shutdown_thread)
Antoine Pitroudd696492011-06-08 17:21:55 +0200251
252 def shutdown_worker():
253 # This is an upper bound
254 nb_children_alive = sum(p.is_alive() for p in processes.values())
255 for i in range(0, nb_children_alive):
Antoine Pitrou1c405b32011-07-03 13:17:06 +0200256 call_queue.put_nowait(None)
Antoine Pitroudc19c242011-07-16 01:51:58 +0200257 # Release the queue's resources as soon as possible.
258 call_queue.close()
Antoine Pitroudd696492011-06-08 17:21:55 +0200259 # If .join() is not called on the created processes then
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200260 # some ctx.Queue methods may deadlock on Mac OS X.
Antoine Pitroudd696492011-06-08 17:21:55 +0200261 for p in processes.values():
262 p.join()
263
Antoine Pitroubdb1cf12012-03-05 19:28:37 +0100264 reader = result_queue._reader
265
Brian Quinlan81c4d362010-09-18 22:35:02 +0000266 while True:
267 _add_call_item_to_queue(pending_work_items,
268 work_ids_queue,
269 call_queue)
270
Antoine Pitroudd696492011-06-08 17:21:55 +0200271 sentinels = [p.sentinel for p in processes.values()]
272 assert sentinels
Antoine Pitroubdb1cf12012-03-05 19:28:37 +0100273 ready = wait([reader] + sentinels)
274 if reader in ready:
275 result_item = reader.recv()
276 else:
Antoine Pitroudd696492011-06-08 17:21:55 +0200277 # Mark the process pool broken so that submits fail right now.
278 executor = executor_reference()
279 if executor is not None:
280 executor._broken = True
281 executor._shutdown_thread = True
Antoine Pitrou020436b2011-07-02 21:20:25 +0200282 executor = None
Antoine Pitroudd696492011-06-08 17:21:55 +0200283 # All futures in flight must be marked failed
284 for work_id, work_item in pending_work_items.items():
285 work_item.future.set_exception(
286 BrokenProcessPool(
287 "A process in the process pool was "
288 "terminated abruptly while the future was "
289 "running or pending."
290 ))
Andrew Svetlov6b973742012-11-03 15:36:01 +0200291 # Delete references to object. See issue16284
292 del work_item
Antoine Pitroudd696492011-06-08 17:21:55 +0200293 pending_work_items.clear()
294 # Terminate remaining workers forcibly: the queues or their
295 # locks may be in a dirty state and block forever.
296 for p in processes.values():
297 p.terminate()
Antoine Pitroudc19c242011-07-16 01:51:58 +0200298 shutdown_worker()
Antoine Pitroudd696492011-06-08 17:21:55 +0200299 return
300 if isinstance(result_item, int):
301 # Clean shutdown of a worker using its PID
302 # (avoids marking the executor broken)
Antoine Pitrou020436b2011-07-02 21:20:25 +0200303 assert shutting_down()
Antoine Pitroud06a0652011-07-16 01:13:34 +0200304 p = processes.pop(result_item)
305 p.join()
Antoine Pitrou020436b2011-07-02 21:20:25 +0200306 if not processes:
307 shutdown_worker()
308 return
Antoine Pitroudd696492011-06-08 17:21:55 +0200309 elif result_item is not None:
310 work_item = pending_work_items.pop(result_item.work_id, None)
311 # work_item can be None if another process terminated (see above)
312 if work_item is not None:
313 if result_item.exception:
314 work_item.future.set_exception(result_item.exception)
315 else:
316 work_item.future.set_result(result_item.result)
Andrew Svetlov6b973742012-11-03 15:36:01 +0200317 # Delete references to object. See issue16284
318 del work_item
Antoine Pitroudd696492011-06-08 17:21:55 +0200319 # Check whether we should start shutting down.
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100320 executor = executor_reference()
321 # No more work items can be added if:
322 # - The interpreter is shutting down OR
323 # - The executor that owns this worker has been collected OR
324 # - The executor that owns this worker has been shutdown.
Antoine Pitrou020436b2011-07-02 21:20:25 +0200325 if shutting_down():
Antoine Pitrou020436b2011-07-02 21:20:25 +0200326 try:
Antoine Pitrou1c405b32011-07-03 13:17:06 +0200327 # Since no new work items can be added, it is safe to shutdown
328 # this thread if there are no pending work items.
329 if not pending_work_items:
330 shutdown_worker()
331 return
Antoine Pitrou020436b2011-07-02 21:20:25 +0200332 except Full:
333 # This is not a problem: we will eventually be woken up (in
Antoine Pitrou1c405b32011-07-03 13:17:06 +0200334 # result_queue.get()) and be able to send a sentinel again.
Antoine Pitrou020436b2011-07-02 21:20:25 +0200335 pass
336 executor = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000337
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000338_system_limits_checked = False
339_system_limited = None
340def _check_system_limits():
341 global _system_limits_checked, _system_limited
342 if _system_limits_checked:
343 if _system_limited:
344 raise NotImplementedError(_system_limited)
345 _system_limits_checked = True
346 try:
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000347 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
348 except (AttributeError, ValueError):
349 # sysconf not available or setting not available
350 return
351 if nsems_max == -1:
Ezio Melottib5bc3532013-08-17 16:11:40 +0300352 # indetermined limit, assume that limit is determined
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000353 # by available memory only
354 return
355 if nsems_max >= 256:
356 # minimum number of semaphores available
357 # according to POSIX
358 return
359 _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
360 raise NotImplementedError(_system_limited)
361
Antoine Pitroudd696492011-06-08 17:21:55 +0200362
Grzegorz Grzywacz97e1b1c2017-09-01 18:54:00 +0200363def _chain_from_iterable_of_lists(iterable):
364 """
365 Specialized implementation of itertools.chain.from_iterable.
366 Each item in *iterable* should be a list. This function is
367 careful not to keep references to yielded objects.
368 """
369 for element in iterable:
370 element.reverse()
371 while element:
372 yield element.pop()
373
374
Antoine Pitroudd696492011-06-08 17:21:55 +0200375class BrokenProcessPool(RuntimeError):
376 """
377 Raised when a process in a ProcessPoolExecutor terminated abruptly
378 while a future was in the running state.
379 """
380
381
Brian Quinlan81c4d362010-09-18 22:35:02 +0000382class ProcessPoolExecutor(_base.Executor):
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200383 def __init__(self, max_workers=None, mp_context=None):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000384 """Initializes a new ProcessPoolExecutor instance.
385
386 Args:
387 max_workers: The maximum number of processes that can be used to
388 execute the given calls. If None or not given then as many
389 worker processes will be created as the machine has processors.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200390 mp_context: A multiprocessing context to launch the workers. This
391 object should provide SimpleQueue, Queue and Process.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000392 """
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000393 _check_system_limits()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000394
395 if max_workers is None:
Charles-François Natali37cfb0a2013-06-28 19:25:45 +0200396 self._max_workers = os.cpu_count() or 1
Brian Quinlan81c4d362010-09-18 22:35:02 +0000397 else:
Brian Quinlan20efceb2014-05-17 13:51:10 -0700398 if max_workers <= 0:
399 raise ValueError("max_workers must be greater than 0")
400
Brian Quinlan81c4d362010-09-18 22:35:02 +0000401 self._max_workers = max_workers
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200402 if mp_context is None:
403 mp_context = mp.get_context()
404 self._mp_context = mp_context
Brian Quinlan81c4d362010-09-18 22:35:02 +0000405
406 # Make the call queue slightly larger than the number of processes to
407 # prevent the worker processes from idling. But don't make it too big
408 # because futures in the call queue cannot be cancelled.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200409 queue_size = self._max_workers + EXTRA_QUEUED_CALLS
410 self._call_queue = mp_context.Queue(queue_size)
Antoine Pitroudc19c242011-07-16 01:51:58 +0200411 # Killed worker processes can produce spurious "broken pipe"
412 # tracebacks in the queue's own worker thread. But we detect killed
413 # processes anyway, so silence the tracebacks.
414 self._call_queue._ignore_epipe = True
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200415 self._result_queue = mp_context.SimpleQueue()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000416 self._work_ids = queue.Queue()
417 self._queue_management_thread = None
Antoine Pitroudd696492011-06-08 17:21:55 +0200418 # Map of pids to processes
419 self._processes = {}
Brian Quinlan81c4d362010-09-18 22:35:02 +0000420
421 # Shutdown is a two-step process.
422 self._shutdown_thread = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000423 self._shutdown_lock = threading.Lock()
Antoine Pitroudd696492011-06-08 17:21:55 +0200424 self._broken = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000425 self._queue_count = 0
426 self._pending_work_items = {}
427
428 def _start_queue_management_thread(self):
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100429 # When the executor gets lost, the weakref callback will wake up
430 # the queue management thread.
431 def weakref_cb(_, q=self._result_queue):
432 q.put(None)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000433 if self._queue_management_thread is None:
Antoine Pitroudd696492011-06-08 17:21:55 +0200434 # Start the processes so that their sentinels are known.
435 self._adjust_process_count()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000436 self._queue_management_thread = threading.Thread(
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200437 target=_queue_management_worker,
438 args=(weakref.ref(self, weakref_cb),
439 self._processes,
440 self._pending_work_items,
441 self._work_ids,
442 self._call_queue,
443 self._result_queue))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000444 self._queue_management_thread.daemon = True
445 self._queue_management_thread.start()
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100446 _threads_queues[self._queue_management_thread] = self._result_queue
Brian Quinlan81c4d362010-09-18 22:35:02 +0000447
448 def _adjust_process_count(self):
449 for _ in range(len(self._processes), self._max_workers):
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200450 p = self._mp_context.Process(
451 target=_process_worker,
452 args=(self._call_queue,
453 self._result_queue))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000454 p.start()
Antoine Pitroudd696492011-06-08 17:21:55 +0200455 self._processes[p.pid] = p
Brian Quinlan81c4d362010-09-18 22:35:02 +0000456
457 def submit(self, fn, *args, **kwargs):
458 with self._shutdown_lock:
Antoine Pitroudd696492011-06-08 17:21:55 +0200459 if self._broken:
460 raise BrokenProcessPool('A child process terminated '
461 'abruptly, the process pool is not usable anymore')
Brian Quinlan81c4d362010-09-18 22:35:02 +0000462 if self._shutdown_thread:
463 raise RuntimeError('cannot schedule new futures after shutdown')
464
465 f = _base.Future()
466 w = _WorkItem(f, fn, args, kwargs)
467
468 self._pending_work_items[self._queue_count] = w
469 self._work_ids.put(self._queue_count)
470 self._queue_count += 1
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100471 # Wake up queue management thread
472 self._result_queue.put(None)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000473
474 self._start_queue_management_thread()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000475 return f
476 submit.__doc__ = _base.Executor.submit.__doc__
477
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200478 def map(self, fn, *iterables, timeout=None, chunksize=1):
Martin Panterd2ad5712015-11-02 04:20:33 +0000479 """Returns an iterator equivalent to map(fn, iter).
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200480
481 Args:
482 fn: A callable that will take as many arguments as there are
483 passed iterables.
484 timeout: The maximum number of seconds to wait. If None, then there
485 is no limit on the wait time.
486 chunksize: If greater than one, the iterables will be chopped into
487 chunks of size chunksize and submitted to the process pool.
488 If set to one, the items in the list will be sent one at a time.
489
490 Returns:
491 An iterator equivalent to: map(func, *iterables) but the calls may
492 be evaluated out-of-order.
493
494 Raises:
495 TimeoutError: If the entire result iterator could not be generated
496 before the given timeout.
497 Exception: If fn(*args) raises for any values.
498 """
499 if chunksize < 1:
500 raise ValueError("chunksize must be >= 1.")
501
502 results = super().map(partial(_process_chunk, fn),
503 _get_chunks(*iterables, chunksize=chunksize),
504 timeout=timeout)
Grzegorz Grzywacz97e1b1c2017-09-01 18:54:00 +0200505 return _chain_from_iterable_of_lists(results)
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200506
Brian Quinlan81c4d362010-09-18 22:35:02 +0000507 def shutdown(self, wait=True):
508 with self._shutdown_lock:
509 self._shutdown_thread = True
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100510 if self._queue_management_thread:
511 # Wake up queue management thread
512 self._result_queue.put(None)
513 if wait:
Brian Quinlan81c4d362010-09-18 22:35:02 +0000514 self._queue_management_thread.join()
Ezio Melottib5bc3532013-08-17 16:11:40 +0300515 # To reduce the risk of opening too many files, remove references to
Brian Quinlan81c4d362010-09-18 22:35:02 +0000516 # objects that use file descriptors.
517 self._queue_management_thread = None
Victor Stinnerb713adf2017-09-02 00:25:11 +0200518 if self._call_queue is not None:
519 self._call_queue.close()
520 if wait:
521 self._call_queue.join_thread()
522 self._call_queue = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000523 self._result_queue = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000524 self._processes = None
525 shutdown.__doc__ = _base.Executor.shutdown.__doc__
526
527atexit.register(_python_exit)