blob: 35af65d0beeea6630af3f8477374984595452e5a [file] [log] [blame]
Brian Quinlan81c4d362010-09-18 22:35:02 +00001# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4"""Implements ProcessPoolExecutor.
5
6The follow diagram and text describe the data-flow through the system:
7
8|======================= In-process =====================|== Out-of-process ==|
9
10+----------+ +----------+ +--------+ +-----------+ +---------+
11| | => | Work Ids | => | | => | Call Q | => | |
12| | +----------+ | | +-----------+ | |
13| | | ... | | | | ... | | |
14| | | 6 | | | | 5, call() | | |
15| | | 7 | | | | ... | | |
16| Process | | ... | | Local | +-----------+ | Process |
17| Pool | +----------+ | Worker | | #1..n |
18| Executor | | Thread | | |
19| | +----------- + | | +-----------+ | |
20| | <=> | Work Items | <=> | | <= | Result Q | <= | |
21| | +------------+ | | +-----------+ | |
22| | | 6: call() | | | | ... | | |
23| | | future | | | | 4, result | | |
24| | | ... | | | | 3, except | | |
25+----------+ +------------+ +--------+ +-----------+ +---------+
26
27Executor.submit() called:
28- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29- adds the id of the _WorkItem to the "Work Ids" queue
30
31Local worker thread:
32- reads work ids from the "Work Ids" queue and looks up the corresponding
33 WorkItem from the "Work Items" dict: if the work item has been cancelled then
34 it is simply removed from the dict, otherwise it is repackaged as a
35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38- reads _ResultItems from "Result Q", updates the future stored in the
39 "Work Items" dict and deletes the dict entry
40
41Process #1..n:
42- reads _CallItems from "Call Q", executes the calls, and puts the resulting
Mark Dickinson5ee24042012-10-20 13:16:49 +010043 _ResultItems in "Result Q"
Brian Quinlan81c4d362010-09-18 22:35:02 +000044"""
45
46__author__ = 'Brian Quinlan (brian@sweetapp.com)'
47
48import atexit
Antoine Pitroudd696492011-06-08 17:21:55 +020049import os
Brian Quinlan81c4d362010-09-18 22:35:02 +000050from concurrent.futures import _base
51import queue
Richard Oudkerk1f2eaa92013-10-16 17:06:22 +010052from queue import Full
Thomas Moreaue8c368d2017-10-03 11:53:17 +020053import multiprocessing as mp
Antoine Pitroubdb1cf12012-03-05 19:28:37 +010054from multiprocessing.connection import wait
Brian Quinlan81c4d362010-09-18 22:35:02 +000055import threading
56import weakref
Antoine Pitrou4aae2762014-10-04 20:20:10 +020057from functools import partial
58import itertools
Antoine Pitrou1285c9b2015-01-17 20:02:14 +010059import traceback
Brian Quinlan81c4d362010-09-18 22:35:02 +000060
61# Workers are created as daemon threads and processes. This is done to allow the
62# interpreter to exit when there are still idle processes in a
63# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
64# allowing workers to die with the interpreter has two undesirable properties:
Raymond Hettinger15f44ab2016-08-30 10:47:49 -070065# - The workers would still be running during interpreter shutdown,
Brian Quinlan81c4d362010-09-18 22:35:02 +000066# meaning that they would fail in unpredictable ways.
67# - The workers could be killed while evaluating a work item, which could
68# be bad if the callable being evaluated has external side-effects e.g.
69# writing to a file.
70#
71# To work around this problem, an exit handler is installed which tells the
72# workers to exit when their work queues are empty and then waits until the
73# threads/processes finish.
74
Antoine Pitrouc13d4542011-03-26 19:29:44 +010075_threads_queues = weakref.WeakKeyDictionary()
Thomas Moreaue8c368d2017-10-03 11:53:17 +020076_global_shutdown = False
Brian Quinlan81c4d362010-09-18 22:35:02 +000077
78def _python_exit():
Thomas Moreaue8c368d2017-10-03 11:53:17 +020079 global _global_shutdown
80 _global_shutdown = True
Antoine Pitrouc13d4542011-03-26 19:29:44 +010081 items = list(_threads_queues.items())
82 for t, q in items:
83 q.put(None)
84 for t, q in items:
85 t.join()
Brian Quinlan81c4d362010-09-18 22:35:02 +000086
87# Controls how many more calls than processes will be queued in the call queue.
88# A smaller number will mean that processes spend more time idle waiting for
89# work while a larger number will make Future.cancel() succeed less frequently
90# (Futures in the call queue cannot be cancelled).
91EXTRA_QUEUED_CALLS = 1
92
Antoine Pitrou1285c9b2015-01-17 20:02:14 +010093# Hack to embed stringification of remote traceback in local traceback
94
95class _RemoteTraceback(Exception):
96 def __init__(self, tb):
97 self.tb = tb
98 def __str__(self):
99 return self.tb
100
101class _ExceptionWithTraceback:
102 def __init__(self, exc, tb):
103 tb = traceback.format_exception(type(exc), exc, tb)
104 tb = ''.join(tb)
105 self.exc = exc
106 self.tb = '\n"""\n%s"""' % tb
107 def __reduce__(self):
108 return _rebuild_exc, (self.exc, self.tb)
109
110def _rebuild_exc(exc, tb):
111 exc.__cause__ = _RemoteTraceback(tb)
112 return exc
113
Brian Quinlan81c4d362010-09-18 22:35:02 +0000114class _WorkItem(object):
115 def __init__(self, future, fn, args, kwargs):
116 self.future = future
117 self.fn = fn
118 self.args = args
119 self.kwargs = kwargs
120
121class _ResultItem(object):
122 def __init__(self, work_id, exception=None, result=None):
123 self.work_id = work_id
124 self.exception = exception
125 self.result = result
126
127class _CallItem(object):
128 def __init__(self, work_id, fn, args, kwargs):
129 self.work_id = work_id
130 self.fn = fn
131 self.args = args
132 self.kwargs = kwargs
133
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100134
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200135def _get_chunks(*iterables, chunksize):
136 """ Iterates over zip()ed iterables in chunks. """
137 it = zip(*iterables)
138 while True:
139 chunk = tuple(itertools.islice(it, chunksize))
140 if not chunk:
141 return
142 yield chunk
143
144def _process_chunk(fn, chunk):
145 """ Processes a chunk of an iterable passed to map.
146
147 Runs the function passed to map() on a chunk of the
148 iterable passed to map.
149
150 This function is run in a separate process.
151
152 """
153 return [fn(*args) for args in chunk]
154
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100155def _process_worker(call_queue, result_queue, initializer, initargs):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000156 """Evaluates calls from call_queue and places the results in result_queue.
157
Georg Brandlfb1720b2010-12-09 18:08:43 +0000158 This worker is run in a separate process.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000159
160 Args:
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200161 call_queue: A ctx.Queue of _CallItems that will be read and
Brian Quinlan81c4d362010-09-18 22:35:02 +0000162 evaluated by the worker.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200163 result_queue: A ctx.Queue of _ResultItems that will written
Brian Quinlan81c4d362010-09-18 22:35:02 +0000164 to by the worker.
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100165 initializer: A callable initializer, or None
166 initargs: A tuple of args for the initializer
Brian Quinlan81c4d362010-09-18 22:35:02 +0000167 """
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100168 if initializer is not None:
169 try:
170 initializer(*initargs)
171 except BaseException:
172 _base.LOGGER.critical('Exception in initializer:', exc_info=True)
173 # The parent will notice that the process stopped and
174 # mark the pool broken
175 return
Brian Quinlan81c4d362010-09-18 22:35:02 +0000176 while True:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200177 call_item = call_queue.get(block=True)
178 if call_item is None:
179 # Wake up queue management thread
Antoine Pitroudd696492011-06-08 17:21:55 +0200180 result_queue.put(os.getpid())
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200181 return
Brian Quinlan81c4d362010-09-18 22:35:02 +0000182 try:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200183 r = call_item.fn(*call_item.args, **call_item.kwargs)
184 except BaseException as e:
Antoine Pitrou1285c9b2015-01-17 20:02:14 +0100185 exc = _ExceptionWithTraceback(e, e.__traceback__)
186 result_queue.put(_ResultItem(call_item.work_id, exception=exc))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000187 else:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200188 result_queue.put(_ResultItem(call_item.work_id,
189 result=r))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000190
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200191 # Liberate the resource as soon as possible, to avoid holding onto
192 # open files or shared memory that is not needed anymore
193 del call_item
194
195
Brian Quinlan81c4d362010-09-18 22:35:02 +0000196def _add_call_item_to_queue(pending_work_items,
197 work_ids,
198 call_queue):
199 """Fills call_queue with _WorkItems from pending_work_items.
200
201 This function never blocks.
202
203 Args:
204 pending_work_items: A dict mapping work ids to _WorkItems e.g.
205 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
206 work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
207 are consumed and the corresponding _WorkItems from
208 pending_work_items are transformed into _CallItems and put in
209 call_queue.
210 call_queue: A multiprocessing.Queue that will be filled with _CallItems
211 derived from _WorkItems.
212 """
213 while True:
214 if call_queue.full():
215 return
216 try:
217 work_id = work_ids.get(block=False)
218 except queue.Empty:
219 return
220 else:
221 work_item = pending_work_items[work_id]
222
223 if work_item.future.set_running_or_notify_cancel():
224 call_queue.put(_CallItem(work_id,
225 work_item.fn,
226 work_item.args,
227 work_item.kwargs),
228 block=True)
229 else:
230 del pending_work_items[work_id]
231 continue
232
Antoine Pitroub87a56a2011-05-03 16:34:42 +0200233def _queue_management_worker(executor_reference,
234 processes,
235 pending_work_items,
236 work_ids_queue,
237 call_queue,
238 result_queue):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000239 """Manages the communication between this process and the worker processes.
240
241 This function is run in a local thread.
242
243 Args:
244 executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
245 this thread. Used to determine if the ProcessPoolExecutor has been
246 garbage collected and that this function can exit.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200247 process: A list of the ctx.Process instances used as
Brian Quinlan81c4d362010-09-18 22:35:02 +0000248 workers.
249 pending_work_items: A dict mapping work ids to _WorkItems e.g.
250 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
251 work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200252 call_queue: A ctx.Queue that will be filled with _CallItems
Brian Quinlan81c4d362010-09-18 22:35:02 +0000253 derived from _WorkItems for processing by the process workers.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200254 result_queue: A ctx.SimpleQueue of _ResultItems generated by the
Brian Quinlan81c4d362010-09-18 22:35:02 +0000255 process workers.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000256 """
Antoine Pitrou020436b2011-07-02 21:20:25 +0200257 executor = None
258
259 def shutting_down():
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200260 return (_global_shutdown or executor is None
261 or executor._shutdown_thread)
Antoine Pitroudd696492011-06-08 17:21:55 +0200262
263 def shutdown_worker():
264 # This is an upper bound
265 nb_children_alive = sum(p.is_alive() for p in processes.values())
266 for i in range(0, nb_children_alive):
Antoine Pitrou1c405b32011-07-03 13:17:06 +0200267 call_queue.put_nowait(None)
Antoine Pitroudc19c242011-07-16 01:51:58 +0200268 # Release the queue's resources as soon as possible.
269 call_queue.close()
Antoine Pitroudd696492011-06-08 17:21:55 +0200270 # If .join() is not called on the created processes then
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200271 # some ctx.Queue methods may deadlock on Mac OS X.
Antoine Pitroudd696492011-06-08 17:21:55 +0200272 for p in processes.values():
273 p.join()
274
Antoine Pitroubdb1cf12012-03-05 19:28:37 +0100275 reader = result_queue._reader
276
Brian Quinlan81c4d362010-09-18 22:35:02 +0000277 while True:
278 _add_call_item_to_queue(pending_work_items,
279 work_ids_queue,
280 call_queue)
281
Antoine Pitroudd696492011-06-08 17:21:55 +0200282 sentinels = [p.sentinel for p in processes.values()]
283 assert sentinels
Antoine Pitroubdb1cf12012-03-05 19:28:37 +0100284 ready = wait([reader] + sentinels)
285 if reader in ready:
286 result_item = reader.recv()
287 else:
Antoine Pitroudd696492011-06-08 17:21:55 +0200288 # Mark the process pool broken so that submits fail right now.
289 executor = executor_reference()
290 if executor is not None:
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100291 executor._broken = ('A child process terminated '
292 'abruptly, the process pool is not '
293 'usable anymore')
Antoine Pitroudd696492011-06-08 17:21:55 +0200294 executor._shutdown_thread = True
Antoine Pitrou020436b2011-07-02 21:20:25 +0200295 executor = None
Antoine Pitroudd696492011-06-08 17:21:55 +0200296 # All futures in flight must be marked failed
297 for work_id, work_item in pending_work_items.items():
298 work_item.future.set_exception(
299 BrokenProcessPool(
300 "A process in the process pool was "
301 "terminated abruptly while the future was "
302 "running or pending."
303 ))
Andrew Svetlov6b973742012-11-03 15:36:01 +0200304 # Delete references to object. See issue16284
305 del work_item
Antoine Pitroudd696492011-06-08 17:21:55 +0200306 pending_work_items.clear()
307 # Terminate remaining workers forcibly: the queues or their
308 # locks may be in a dirty state and block forever.
309 for p in processes.values():
310 p.terminate()
Antoine Pitroudc19c242011-07-16 01:51:58 +0200311 shutdown_worker()
Antoine Pitroudd696492011-06-08 17:21:55 +0200312 return
313 if isinstance(result_item, int):
314 # Clean shutdown of a worker using its PID
315 # (avoids marking the executor broken)
Antoine Pitrou020436b2011-07-02 21:20:25 +0200316 assert shutting_down()
Antoine Pitroud06a0652011-07-16 01:13:34 +0200317 p = processes.pop(result_item)
318 p.join()
Antoine Pitrou020436b2011-07-02 21:20:25 +0200319 if not processes:
320 shutdown_worker()
321 return
Antoine Pitroudd696492011-06-08 17:21:55 +0200322 elif result_item is not None:
323 work_item = pending_work_items.pop(result_item.work_id, None)
324 # work_item can be None if another process terminated (see above)
325 if work_item is not None:
326 if result_item.exception:
327 work_item.future.set_exception(result_item.exception)
328 else:
329 work_item.future.set_result(result_item.result)
Andrew Svetlov6b973742012-11-03 15:36:01 +0200330 # Delete references to object. See issue16284
331 del work_item
Antoine Pitroudd696492011-06-08 17:21:55 +0200332 # Check whether we should start shutting down.
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100333 executor = executor_reference()
334 # No more work items can be added if:
335 # - The interpreter is shutting down OR
336 # - The executor that owns this worker has been collected OR
337 # - The executor that owns this worker has been shutdown.
Antoine Pitrou020436b2011-07-02 21:20:25 +0200338 if shutting_down():
Antoine Pitrou020436b2011-07-02 21:20:25 +0200339 try:
Antoine Pitrou1c405b32011-07-03 13:17:06 +0200340 # Since no new work items can be added, it is safe to shutdown
341 # this thread if there are no pending work items.
342 if not pending_work_items:
343 shutdown_worker()
344 return
Antoine Pitrou020436b2011-07-02 21:20:25 +0200345 except Full:
346 # This is not a problem: we will eventually be woken up (in
Antoine Pitrou1c405b32011-07-03 13:17:06 +0200347 # result_queue.get()) and be able to send a sentinel again.
Antoine Pitrou020436b2011-07-02 21:20:25 +0200348 pass
349 executor = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000350
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000351_system_limits_checked = False
352_system_limited = None
353def _check_system_limits():
354 global _system_limits_checked, _system_limited
355 if _system_limits_checked:
356 if _system_limited:
357 raise NotImplementedError(_system_limited)
358 _system_limits_checked = True
359 try:
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000360 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
361 except (AttributeError, ValueError):
362 # sysconf not available or setting not available
363 return
364 if nsems_max == -1:
Ezio Melottib5bc3532013-08-17 16:11:40 +0300365 # indetermined limit, assume that limit is determined
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000366 # by available memory only
367 return
368 if nsems_max >= 256:
369 # minimum number of semaphores available
370 # according to POSIX
371 return
372 _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
373 raise NotImplementedError(_system_limited)
374
Antoine Pitroudd696492011-06-08 17:21:55 +0200375
Grzegorz Grzywacz97e1b1c2017-09-01 18:54:00 +0200376def _chain_from_iterable_of_lists(iterable):
377 """
378 Specialized implementation of itertools.chain.from_iterable.
379 Each item in *iterable* should be a list. This function is
380 careful not to keep references to yielded objects.
381 """
382 for element in iterable:
383 element.reverse()
384 while element:
385 yield element.pop()
386
387
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100388class BrokenProcessPool(_base.BrokenExecutor):
Antoine Pitroudd696492011-06-08 17:21:55 +0200389 """
390 Raised when a process in a ProcessPoolExecutor terminated abruptly
391 while a future was in the running state.
392 """
393
394
Brian Quinlan81c4d362010-09-18 22:35:02 +0000395class ProcessPoolExecutor(_base.Executor):
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100396 def __init__(self, max_workers=None, mp_context=None,
397 initializer=None, initargs=()):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000398 """Initializes a new ProcessPoolExecutor instance.
399
400 Args:
401 max_workers: The maximum number of processes that can be used to
402 execute the given calls. If None or not given then as many
403 worker processes will be created as the machine has processors.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200404 mp_context: A multiprocessing context to launch the workers. This
405 object should provide SimpleQueue, Queue and Process.
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100406 initializer: An callable used to initialize worker processes.
407 initargs: A tuple of arguments to pass to the initializer.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000408 """
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000409 _check_system_limits()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000410
411 if max_workers is None:
Charles-François Natali37cfb0a2013-06-28 19:25:45 +0200412 self._max_workers = os.cpu_count() or 1
Brian Quinlan81c4d362010-09-18 22:35:02 +0000413 else:
Brian Quinlan20efceb2014-05-17 13:51:10 -0700414 if max_workers <= 0:
415 raise ValueError("max_workers must be greater than 0")
416
Brian Quinlan81c4d362010-09-18 22:35:02 +0000417 self._max_workers = max_workers
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200418 if mp_context is None:
419 mp_context = mp.get_context()
420 self._mp_context = mp_context
Brian Quinlan81c4d362010-09-18 22:35:02 +0000421
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100422 if initializer is not None and not callable(initializer):
423 raise TypeError("initializer must be a callable")
424 self._initializer = initializer
425 self._initargs = initargs
426
Brian Quinlan81c4d362010-09-18 22:35:02 +0000427 # Make the call queue slightly larger than the number of processes to
428 # prevent the worker processes from idling. But don't make it too big
429 # because futures in the call queue cannot be cancelled.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200430 queue_size = self._max_workers + EXTRA_QUEUED_CALLS
431 self._call_queue = mp_context.Queue(queue_size)
Antoine Pitroudc19c242011-07-16 01:51:58 +0200432 # Killed worker processes can produce spurious "broken pipe"
433 # tracebacks in the queue's own worker thread. But we detect killed
434 # processes anyway, so silence the tracebacks.
435 self._call_queue._ignore_epipe = True
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200436 self._result_queue = mp_context.SimpleQueue()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000437 self._work_ids = queue.Queue()
438 self._queue_management_thread = None
Antoine Pitroudd696492011-06-08 17:21:55 +0200439 # Map of pids to processes
440 self._processes = {}
Brian Quinlan81c4d362010-09-18 22:35:02 +0000441
442 # Shutdown is a two-step process.
443 self._shutdown_thread = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000444 self._shutdown_lock = threading.Lock()
Antoine Pitroudd696492011-06-08 17:21:55 +0200445 self._broken = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000446 self._queue_count = 0
447 self._pending_work_items = {}
448
449 def _start_queue_management_thread(self):
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100450 # When the executor gets lost, the weakref callback will wake up
451 # the queue management thread.
452 def weakref_cb(_, q=self._result_queue):
453 q.put(None)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000454 if self._queue_management_thread is None:
Antoine Pitroudd696492011-06-08 17:21:55 +0200455 # Start the processes so that their sentinels are known.
456 self._adjust_process_count()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000457 self._queue_management_thread = threading.Thread(
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200458 target=_queue_management_worker,
459 args=(weakref.ref(self, weakref_cb),
460 self._processes,
461 self._pending_work_items,
462 self._work_ids,
463 self._call_queue,
464 self._result_queue))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000465 self._queue_management_thread.daemon = True
466 self._queue_management_thread.start()
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100467 _threads_queues[self._queue_management_thread] = self._result_queue
Brian Quinlan81c4d362010-09-18 22:35:02 +0000468
469 def _adjust_process_count(self):
470 for _ in range(len(self._processes), self._max_workers):
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200471 p = self._mp_context.Process(
472 target=_process_worker,
473 args=(self._call_queue,
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100474 self._result_queue,
475 self._initializer,
476 self._initargs))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000477 p.start()
Antoine Pitroudd696492011-06-08 17:21:55 +0200478 self._processes[p.pid] = p
Brian Quinlan81c4d362010-09-18 22:35:02 +0000479
480 def submit(self, fn, *args, **kwargs):
481 with self._shutdown_lock:
Antoine Pitroudd696492011-06-08 17:21:55 +0200482 if self._broken:
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100483 raise BrokenProcessPool(self._broken)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000484 if self._shutdown_thread:
485 raise RuntimeError('cannot schedule new futures after shutdown')
486
487 f = _base.Future()
488 w = _WorkItem(f, fn, args, kwargs)
489
490 self._pending_work_items[self._queue_count] = w
491 self._work_ids.put(self._queue_count)
492 self._queue_count += 1
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100493 # Wake up queue management thread
494 self._result_queue.put(None)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000495
496 self._start_queue_management_thread()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000497 return f
498 submit.__doc__ = _base.Executor.submit.__doc__
499
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200500 def map(self, fn, *iterables, timeout=None, chunksize=1):
Martin Panterd2ad5712015-11-02 04:20:33 +0000501 """Returns an iterator equivalent to map(fn, iter).
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200502
503 Args:
504 fn: A callable that will take as many arguments as there are
505 passed iterables.
506 timeout: The maximum number of seconds to wait. If None, then there
507 is no limit on the wait time.
508 chunksize: If greater than one, the iterables will be chopped into
509 chunks of size chunksize and submitted to the process pool.
510 If set to one, the items in the list will be sent one at a time.
511
512 Returns:
513 An iterator equivalent to: map(func, *iterables) but the calls may
514 be evaluated out-of-order.
515
516 Raises:
517 TimeoutError: If the entire result iterator could not be generated
518 before the given timeout.
519 Exception: If fn(*args) raises for any values.
520 """
521 if chunksize < 1:
522 raise ValueError("chunksize must be >= 1.")
523
524 results = super().map(partial(_process_chunk, fn),
525 _get_chunks(*iterables, chunksize=chunksize),
526 timeout=timeout)
Grzegorz Grzywacz97e1b1c2017-09-01 18:54:00 +0200527 return _chain_from_iterable_of_lists(results)
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200528
Brian Quinlan81c4d362010-09-18 22:35:02 +0000529 def shutdown(self, wait=True):
530 with self._shutdown_lock:
531 self._shutdown_thread = True
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100532 if self._queue_management_thread:
533 # Wake up queue management thread
534 self._result_queue.put(None)
535 if wait:
Brian Quinlan81c4d362010-09-18 22:35:02 +0000536 self._queue_management_thread.join()
Ezio Melottib5bc3532013-08-17 16:11:40 +0300537 # To reduce the risk of opening too many files, remove references to
Brian Quinlan81c4d362010-09-18 22:35:02 +0000538 # objects that use file descriptors.
539 self._queue_management_thread = None
Victor Stinnerb713adf2017-09-02 00:25:11 +0200540 if self._call_queue is not None:
541 self._call_queue.close()
542 if wait:
543 self._call_queue.join_thread()
544 self._call_queue = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000545 self._result_queue = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000546 self._processes = None
547 shutdown.__doc__ = _base.Executor.shutdown.__doc__
548
549atexit.register(_python_exit)