blob: ce7d642b098a71f5150e64bd4d16c1f2f804ff47 [file] [log] [blame]
Brian Quinlan81c4d362010-09-18 22:35:02 +00001# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4"""Implements ProcessPoolExecutor.
5
6The follow diagram and text describe the data-flow through the system:
7
8|======================= In-process =====================|== Out-of-process ==|
9
10+----------+ +----------+ +--------+ +-----------+ +---------+
Thomas Moreau94459fd2018-01-05 11:15:54 +010011| | => | Work Ids | | | | Call Q | | Process |
12| | +----------+ | | +-----------+ | Pool |
13| | | ... | | | | ... | +---------+
14| | | 6 | => | | => | 5, call() | => | |
Brian Quinlan81c4d362010-09-18 22:35:02 +000015| | | 7 | | | | ... | | |
16| Process | | ... | | Local | +-----------+ | Process |
17| Pool | +----------+ | Worker | | #1..n |
18| Executor | | Thread | | |
19| | +----------- + | | +-----------+ | |
20| | <=> | Work Items | <=> | | <= | Result Q | <= | |
21| | +------------+ | | +-----------+ | |
22| | | 6: call() | | | | ... | | |
23| | | future | | | | 4, result | | |
24| | | ... | | | | 3, except | | |
25+----------+ +------------+ +--------+ +-----------+ +---------+
26
27Executor.submit() called:
28- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29- adds the id of the _WorkItem to the "Work Ids" queue
30
31Local worker thread:
32- reads work ids from the "Work Ids" queue and looks up the corresponding
33 WorkItem from the "Work Items" dict: if the work item has been cancelled then
34 it is simply removed from the dict, otherwise it is repackaged as a
35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38- reads _ResultItems from "Result Q", updates the future stored in the
39 "Work Items" dict and deletes the dict entry
40
41Process #1..n:
42- reads _CallItems from "Call Q", executes the calls, and puts the resulting
Mark Dickinson5ee24042012-10-20 13:16:49 +010043 _ResultItems in "Result Q"
Brian Quinlan81c4d362010-09-18 22:35:02 +000044"""
45
46__author__ = 'Brian Quinlan (brian@sweetapp.com)'
47
48import atexit
Antoine Pitroudd696492011-06-08 17:21:55 +020049import os
Brian Quinlan81c4d362010-09-18 22:35:02 +000050from concurrent.futures import _base
51import queue
Richard Oudkerk1f2eaa92013-10-16 17:06:22 +010052from queue import Full
Thomas Moreaue8c368d2017-10-03 11:53:17 +020053import multiprocessing as mp
Antoine Pitroubdb1cf12012-03-05 19:28:37 +010054from multiprocessing.connection import wait
Thomas Moreau94459fd2018-01-05 11:15:54 +010055from multiprocessing.queues import Queue
Brian Quinlan81c4d362010-09-18 22:35:02 +000056import threading
57import weakref
Antoine Pitrou4aae2762014-10-04 20:20:10 +020058from functools import partial
59import itertools
Antoine Pitrou1285c9b2015-01-17 20:02:14 +010060import traceback
Brian Quinlan81c4d362010-09-18 22:35:02 +000061
62# Workers are created as daemon threads and processes. This is done to allow the
63# interpreter to exit when there are still idle processes in a
64# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
65# allowing workers to die with the interpreter has two undesirable properties:
Raymond Hettinger15f44ab2016-08-30 10:47:49 -070066# - The workers would still be running during interpreter shutdown,
Brian Quinlan81c4d362010-09-18 22:35:02 +000067# meaning that they would fail in unpredictable ways.
68# - The workers could be killed while evaluating a work item, which could
69# be bad if the callable being evaluated has external side-effects e.g.
70# writing to a file.
71#
72# To work around this problem, an exit handler is installed which tells the
73# workers to exit when their work queues are empty and then waits until the
74# threads/processes finish.
75
Thomas Moreau94459fd2018-01-05 11:15:54 +010076_threads_wakeups = weakref.WeakKeyDictionary()
Thomas Moreaue8c368d2017-10-03 11:53:17 +020077_global_shutdown = False
Brian Quinlan81c4d362010-09-18 22:35:02 +000078
Thomas Moreau94459fd2018-01-05 11:15:54 +010079
80class _ThreadWakeup:
Thomas Moreau94459fd2018-01-05 11:15:54 +010081 def __init__(self):
82 self._reader, self._writer = mp.Pipe(duplex=False)
83
Thomas Moreau095ee412018-03-12 18:18:41 +010084 def close(self):
85 self._writer.close()
86 self._reader.close()
87
Thomas Moreau94459fd2018-01-05 11:15:54 +010088 def wakeup(self):
89 self._writer.send_bytes(b"")
90
91 def clear(self):
92 while self._reader.poll():
93 self._reader.recv_bytes()
94
95
Brian Quinlan81c4d362010-09-18 22:35:02 +000096def _python_exit():
Thomas Moreaue8c368d2017-10-03 11:53:17 +020097 global _global_shutdown
98 _global_shutdown = True
Thomas Moreau94459fd2018-01-05 11:15:54 +010099 items = list(_threads_wakeups.items())
100 for _, thread_wakeup in items:
101 thread_wakeup.wakeup()
102 for t, _ in items:
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100103 t.join()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000104
105# Controls how many more calls than processes will be queued in the call queue.
106# A smaller number will mean that processes spend more time idle waiting for
107# work while a larger number will make Future.cancel() succeed less frequently
108# (Futures in the call queue cannot be cancelled).
109EXTRA_QUEUED_CALLS = 1
110
Thomas Moreau94459fd2018-01-05 11:15:54 +0100111
Antoine Pitrou1285c9b2015-01-17 20:02:14 +0100112# Hack to embed stringification of remote traceback in local traceback
113
114class _RemoteTraceback(Exception):
115 def __init__(self, tb):
116 self.tb = tb
117 def __str__(self):
118 return self.tb
119
120class _ExceptionWithTraceback:
121 def __init__(self, exc, tb):
122 tb = traceback.format_exception(type(exc), exc, tb)
123 tb = ''.join(tb)
124 self.exc = exc
125 self.tb = '\n"""\n%s"""' % tb
126 def __reduce__(self):
127 return _rebuild_exc, (self.exc, self.tb)
128
129def _rebuild_exc(exc, tb):
130 exc.__cause__ = _RemoteTraceback(tb)
131 return exc
132
Brian Quinlan81c4d362010-09-18 22:35:02 +0000133class _WorkItem(object):
134 def __init__(self, future, fn, args, kwargs):
135 self.future = future
136 self.fn = fn
137 self.args = args
138 self.kwargs = kwargs
139
140class _ResultItem(object):
141 def __init__(self, work_id, exception=None, result=None):
142 self.work_id = work_id
143 self.exception = exception
144 self.result = result
145
146class _CallItem(object):
147 def __init__(self, work_id, fn, args, kwargs):
148 self.work_id = work_id
149 self.fn = fn
150 self.args = args
151 self.kwargs = kwargs
152
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100153
Thomas Moreau94459fd2018-01-05 11:15:54 +0100154class _SafeQueue(Queue):
155 """Safe Queue set exception to the future object linked to a job"""
156 def __init__(self, max_size=0, *, ctx, pending_work_items):
157 self.pending_work_items = pending_work_items
158 super().__init__(max_size, ctx=ctx)
159
160 def _on_queue_feeder_error(self, e, obj):
161 if isinstance(obj, _CallItem):
162 tb = traceback.format_exception(type(e), e, e.__traceback__)
163 e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
164 work_item = self.pending_work_items.pop(obj.work_id, None)
165 # work_item can be None if another process terminated. In this case,
166 # the queue_manager_thread fails all work_items with BrokenProcessPool
167 if work_item is not None:
168 work_item.future.set_exception(e)
169 else:
170 super()._on_queue_feeder_error(e, obj)
171
172
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200173def _get_chunks(*iterables, chunksize):
174 """ Iterates over zip()ed iterables in chunks. """
175 it = zip(*iterables)
176 while True:
177 chunk = tuple(itertools.islice(it, chunksize))
178 if not chunk:
179 return
180 yield chunk
181
182def _process_chunk(fn, chunk):
183 """ Processes a chunk of an iterable passed to map.
184
185 Runs the function passed to map() on a chunk of the
186 iterable passed to map.
187
188 This function is run in a separate process.
189
190 """
191 return [fn(*args) for args in chunk]
192
Thomas Moreau94459fd2018-01-05 11:15:54 +0100193
194def _sendback_result(result_queue, work_id, result=None, exception=None):
195 """Safely send back the given result or exception"""
196 try:
197 result_queue.put(_ResultItem(work_id, result=result,
198 exception=exception))
199 except BaseException as e:
200 exc = _ExceptionWithTraceback(e, e.__traceback__)
201 result_queue.put(_ResultItem(work_id, exception=exc))
202
203
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100204def _process_worker(call_queue, result_queue, initializer, initargs):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000205 """Evaluates calls from call_queue and places the results in result_queue.
206
Georg Brandlfb1720b2010-12-09 18:08:43 +0000207 This worker is run in a separate process.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000208
209 Args:
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200210 call_queue: A ctx.Queue of _CallItems that will be read and
Brian Quinlan81c4d362010-09-18 22:35:02 +0000211 evaluated by the worker.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200212 result_queue: A ctx.Queue of _ResultItems that will written
Brian Quinlan81c4d362010-09-18 22:35:02 +0000213 to by the worker.
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100214 initializer: A callable initializer, or None
215 initargs: A tuple of args for the initializer
Brian Quinlan81c4d362010-09-18 22:35:02 +0000216 """
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100217 if initializer is not None:
218 try:
219 initializer(*initargs)
220 except BaseException:
221 _base.LOGGER.critical('Exception in initializer:', exc_info=True)
222 # The parent will notice that the process stopped and
223 # mark the pool broken
224 return
Brian Quinlan81c4d362010-09-18 22:35:02 +0000225 while True:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200226 call_item = call_queue.get(block=True)
227 if call_item is None:
228 # Wake up queue management thread
Antoine Pitroudd696492011-06-08 17:21:55 +0200229 result_queue.put(os.getpid())
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200230 return
Brian Quinlan81c4d362010-09-18 22:35:02 +0000231 try:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200232 r = call_item.fn(*call_item.args, **call_item.kwargs)
233 except BaseException as e:
Antoine Pitrou1285c9b2015-01-17 20:02:14 +0100234 exc = _ExceptionWithTraceback(e, e.__traceback__)
Thomas Moreau94459fd2018-01-05 11:15:54 +0100235 _sendback_result(result_queue, call_item.work_id, exception=exc)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000236 else:
Thomas Moreau94459fd2018-01-05 11:15:54 +0100237 _sendback_result(result_queue, call_item.work_id, result=r)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000238
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200239 # Liberate the resource as soon as possible, to avoid holding onto
240 # open files or shared memory that is not needed anymore
241 del call_item
242
243
Brian Quinlan81c4d362010-09-18 22:35:02 +0000244def _add_call_item_to_queue(pending_work_items,
245 work_ids,
246 call_queue):
247 """Fills call_queue with _WorkItems from pending_work_items.
248
249 This function never blocks.
250
251 Args:
252 pending_work_items: A dict mapping work ids to _WorkItems e.g.
253 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
254 work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
255 are consumed and the corresponding _WorkItems from
256 pending_work_items are transformed into _CallItems and put in
257 call_queue.
258 call_queue: A multiprocessing.Queue that will be filled with _CallItems
259 derived from _WorkItems.
260 """
261 while True:
262 if call_queue.full():
263 return
264 try:
265 work_id = work_ids.get(block=False)
266 except queue.Empty:
267 return
268 else:
269 work_item = pending_work_items[work_id]
270
271 if work_item.future.set_running_or_notify_cancel():
272 call_queue.put(_CallItem(work_id,
273 work_item.fn,
274 work_item.args,
275 work_item.kwargs),
276 block=True)
277 else:
278 del pending_work_items[work_id]
279 continue
280
Thomas Moreau94459fd2018-01-05 11:15:54 +0100281
Antoine Pitroub87a56a2011-05-03 16:34:42 +0200282def _queue_management_worker(executor_reference,
283 processes,
284 pending_work_items,
285 work_ids_queue,
286 call_queue,
Thomas Moreau94459fd2018-01-05 11:15:54 +0100287 result_queue,
288 thread_wakeup):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000289 """Manages the communication between this process and the worker processes.
290
291 This function is run in a local thread.
292
293 Args:
294 executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
295 this thread. Used to determine if the ProcessPoolExecutor has been
296 garbage collected and that this function can exit.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200297 process: A list of the ctx.Process instances used as
Brian Quinlan81c4d362010-09-18 22:35:02 +0000298 workers.
299 pending_work_items: A dict mapping work ids to _WorkItems e.g.
300 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
301 work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200302 call_queue: A ctx.Queue that will be filled with _CallItems
Brian Quinlan81c4d362010-09-18 22:35:02 +0000303 derived from _WorkItems for processing by the process workers.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200304 result_queue: A ctx.SimpleQueue of _ResultItems generated by the
Brian Quinlan81c4d362010-09-18 22:35:02 +0000305 process workers.
Thomas Moreau94459fd2018-01-05 11:15:54 +0100306 thread_wakeup: A _ThreadWakeup to allow waking up the
307 queue_manager_thread from the main Thread and avoid deadlocks
308 caused by permanently locked queues.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000309 """
Antoine Pitrou020436b2011-07-02 21:20:25 +0200310 executor = None
311
312 def shutting_down():
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200313 return (_global_shutdown or executor is None
314 or executor._shutdown_thread)
Antoine Pitroudd696492011-06-08 17:21:55 +0200315
316 def shutdown_worker():
Thomas Moreau94459fd2018-01-05 11:15:54 +0100317 # This is an upper bound on the number of children alive.
318 n_children_alive = sum(p.is_alive() for p in processes.values())
319 n_children_to_stop = n_children_alive
320 n_sentinels_sent = 0
321 # Send the right number of sentinels, to make sure all children are
322 # properly terminated.
323 while n_sentinels_sent < n_children_to_stop and n_children_alive > 0:
324 for i in range(n_children_to_stop - n_sentinels_sent):
325 try:
326 call_queue.put_nowait(None)
327 n_sentinels_sent += 1
328 except Full:
329 break
330 n_children_alive = sum(p.is_alive() for p in processes.values())
331
Antoine Pitroudc19c242011-07-16 01:51:58 +0200332 # Release the queue's resources as soon as possible.
333 call_queue.close()
Antoine Pitroudd696492011-06-08 17:21:55 +0200334 # If .join() is not called on the created processes then
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200335 # some ctx.Queue methods may deadlock on Mac OS X.
Antoine Pitroudd696492011-06-08 17:21:55 +0200336 for p in processes.values():
337 p.join()
338
Thomas Moreau94459fd2018-01-05 11:15:54 +0100339 result_reader = result_queue._reader
340 wakeup_reader = thread_wakeup._reader
341 readers = [result_reader, wakeup_reader]
Antoine Pitroubdb1cf12012-03-05 19:28:37 +0100342
Brian Quinlan81c4d362010-09-18 22:35:02 +0000343 while True:
344 _add_call_item_to_queue(pending_work_items,
345 work_ids_queue,
346 call_queue)
347
Thomas Moreau94459fd2018-01-05 11:15:54 +0100348 # Wait for a result to be ready in the result_queue while checking
349 # that all worker processes are still running, or for a wake up
350 # signal send. The wake up signals come either from new tasks being
351 # submitted, from the executor being shutdown/gc-ed, or from the
352 # shutdown of the python interpreter.
353 worker_sentinels = [p.sentinel for p in processes.values()]
354 ready = wait(readers + worker_sentinels)
355
356 cause = None
357 is_broken = True
358 if result_reader in ready:
359 try:
360 result_item = result_reader.recv()
361 is_broken = False
362 except BaseException as e:
363 cause = traceback.format_exception(type(e), e, e.__traceback__)
364
365 elif wakeup_reader in ready:
366 is_broken = False
367 result_item = None
368 thread_wakeup.clear()
369 if is_broken:
Antoine Pitroudd696492011-06-08 17:21:55 +0200370 # Mark the process pool broken so that submits fail right now.
371 executor = executor_reference()
372 if executor is not None:
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100373 executor._broken = ('A child process terminated '
374 'abruptly, the process pool is not '
375 'usable anymore')
Antoine Pitroudd696492011-06-08 17:21:55 +0200376 executor._shutdown_thread = True
Antoine Pitrou020436b2011-07-02 21:20:25 +0200377 executor = None
Thomas Moreau94459fd2018-01-05 11:15:54 +0100378 bpe = BrokenProcessPool("A process in the process pool was "
379 "terminated abruptly while the future was "
380 "running or pending.")
381 if cause is not None:
382 bpe.__cause__ = _RemoteTraceback(
383 f"\n'''\n{''.join(cause)}'''")
Antoine Pitroudd696492011-06-08 17:21:55 +0200384 # All futures in flight must be marked failed
385 for work_id, work_item in pending_work_items.items():
Thomas Moreau94459fd2018-01-05 11:15:54 +0100386 work_item.future.set_exception(bpe)
Andrew Svetlov6b973742012-11-03 15:36:01 +0200387 # Delete references to object. See issue16284
388 del work_item
Antoine Pitroudd696492011-06-08 17:21:55 +0200389 pending_work_items.clear()
390 # Terminate remaining workers forcibly: the queues or their
391 # locks may be in a dirty state and block forever.
392 for p in processes.values():
393 p.terminate()
Antoine Pitroudc19c242011-07-16 01:51:58 +0200394 shutdown_worker()
Antoine Pitroudd696492011-06-08 17:21:55 +0200395 return
396 if isinstance(result_item, int):
397 # Clean shutdown of a worker using its PID
398 # (avoids marking the executor broken)
Antoine Pitrou020436b2011-07-02 21:20:25 +0200399 assert shutting_down()
Antoine Pitroud06a0652011-07-16 01:13:34 +0200400 p = processes.pop(result_item)
401 p.join()
Antoine Pitrou020436b2011-07-02 21:20:25 +0200402 if not processes:
403 shutdown_worker()
404 return
Antoine Pitroudd696492011-06-08 17:21:55 +0200405 elif result_item is not None:
406 work_item = pending_work_items.pop(result_item.work_id, None)
407 # work_item can be None if another process terminated (see above)
408 if work_item is not None:
409 if result_item.exception:
410 work_item.future.set_exception(result_item.exception)
411 else:
412 work_item.future.set_result(result_item.result)
Andrew Svetlov6b973742012-11-03 15:36:01 +0200413 # Delete references to object. See issue16284
414 del work_item
Thomas Moreau94459fd2018-01-05 11:15:54 +0100415 # Delete reference to result_item
416 del result_item
417
Antoine Pitroudd696492011-06-08 17:21:55 +0200418 # Check whether we should start shutting down.
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100419 executor = executor_reference()
420 # No more work items can be added if:
421 # - The interpreter is shutting down OR
422 # - The executor that owns this worker has been collected OR
423 # - The executor that owns this worker has been shutdown.
Antoine Pitrou020436b2011-07-02 21:20:25 +0200424 if shutting_down():
Antoine Pitrou020436b2011-07-02 21:20:25 +0200425 try:
Mark Nemecc4b695f2018-04-10 18:23:14 +0100426 # Flag the executor as shutting down as early as possible if it
427 # is not gc-ed yet.
428 if executor is not None:
429 executor._shutdown_thread = True
Antoine Pitrou1c405b32011-07-03 13:17:06 +0200430 # Since no new work items can be added, it is safe to shutdown
431 # this thread if there are no pending work items.
432 if not pending_work_items:
433 shutdown_worker()
434 return
Antoine Pitrou020436b2011-07-02 21:20:25 +0200435 except Full:
436 # This is not a problem: we will eventually be woken up (in
Antoine Pitrou1c405b32011-07-03 13:17:06 +0200437 # result_queue.get()) and be able to send a sentinel again.
Antoine Pitrou020436b2011-07-02 21:20:25 +0200438 pass
439 executor = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000440
Thomas Moreau94459fd2018-01-05 11:15:54 +0100441
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000442_system_limits_checked = False
443_system_limited = None
Thomas Moreau94459fd2018-01-05 11:15:54 +0100444
445
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000446def _check_system_limits():
447 global _system_limits_checked, _system_limited
448 if _system_limits_checked:
449 if _system_limited:
450 raise NotImplementedError(_system_limited)
451 _system_limits_checked = True
452 try:
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000453 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
454 except (AttributeError, ValueError):
455 # sysconf not available or setting not available
456 return
457 if nsems_max == -1:
Ezio Melottib5bc3532013-08-17 16:11:40 +0300458 # indetermined limit, assume that limit is determined
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000459 # by available memory only
460 return
461 if nsems_max >= 256:
462 # minimum number of semaphores available
463 # according to POSIX
464 return
Thomas Moreau94459fd2018-01-05 11:15:54 +0100465 _system_limited = ("system provides too few semaphores (%d"
466 " available, 256 necessary)" % nsems_max)
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000467 raise NotImplementedError(_system_limited)
468
Antoine Pitroudd696492011-06-08 17:21:55 +0200469
Grzegorz Grzywacz97e1b1c2017-09-01 18:54:00 +0200470def _chain_from_iterable_of_lists(iterable):
471 """
472 Specialized implementation of itertools.chain.from_iterable.
473 Each item in *iterable* should be a list. This function is
474 careful not to keep references to yielded objects.
475 """
476 for element in iterable:
477 element.reverse()
478 while element:
479 yield element.pop()
480
481
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100482class BrokenProcessPool(_base.BrokenExecutor):
Antoine Pitroudd696492011-06-08 17:21:55 +0200483 """
484 Raised when a process in a ProcessPoolExecutor terminated abruptly
485 while a future was in the running state.
486 """
487
488
Brian Quinlan81c4d362010-09-18 22:35:02 +0000489class ProcessPoolExecutor(_base.Executor):
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100490 def __init__(self, max_workers=None, mp_context=None,
491 initializer=None, initargs=()):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000492 """Initializes a new ProcessPoolExecutor instance.
493
494 Args:
495 max_workers: The maximum number of processes that can be used to
496 execute the given calls. If None or not given then as many
497 worker processes will be created as the machine has processors.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200498 mp_context: A multiprocessing context to launch the workers. This
499 object should provide SimpleQueue, Queue and Process.
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100500 initializer: An callable used to initialize worker processes.
501 initargs: A tuple of arguments to pass to the initializer.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000502 """
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000503 _check_system_limits()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000504
505 if max_workers is None:
Charles-François Natali37cfb0a2013-06-28 19:25:45 +0200506 self._max_workers = os.cpu_count() or 1
Brian Quinlan81c4d362010-09-18 22:35:02 +0000507 else:
Brian Quinlan20efceb2014-05-17 13:51:10 -0700508 if max_workers <= 0:
509 raise ValueError("max_workers must be greater than 0")
510
Brian Quinlan81c4d362010-09-18 22:35:02 +0000511 self._max_workers = max_workers
Thomas Moreau94459fd2018-01-05 11:15:54 +0100512
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200513 if mp_context is None:
514 mp_context = mp.get_context()
515 self._mp_context = mp_context
Brian Quinlan81c4d362010-09-18 22:35:02 +0000516
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100517 if initializer is not None and not callable(initializer):
518 raise TypeError("initializer must be a callable")
519 self._initializer = initializer
520 self._initargs = initargs
521
Thomas Moreau94459fd2018-01-05 11:15:54 +0100522 # Management thread
Brian Quinlan81c4d362010-09-18 22:35:02 +0000523 self._queue_management_thread = None
Thomas Moreau94459fd2018-01-05 11:15:54 +0100524
Antoine Pitroudd696492011-06-08 17:21:55 +0200525 # Map of pids to processes
526 self._processes = {}
Brian Quinlan81c4d362010-09-18 22:35:02 +0000527
528 # Shutdown is a two-step process.
529 self._shutdown_thread = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000530 self._shutdown_lock = threading.Lock()
Antoine Pitroudd696492011-06-08 17:21:55 +0200531 self._broken = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000532 self._queue_count = 0
533 self._pending_work_items = {}
534
Thomas Moreau94459fd2018-01-05 11:15:54 +0100535 # Create communication channels for the executor
536 # Make the call queue slightly larger than the number of processes to
537 # prevent the worker processes from idling. But don't make it too big
538 # because futures in the call queue cannot be cancelled.
539 queue_size = self._max_workers + EXTRA_QUEUED_CALLS
540 self._call_queue = _SafeQueue(
541 max_size=queue_size, ctx=self._mp_context,
542 pending_work_items=self._pending_work_items)
543 # Killed worker processes can produce spurious "broken pipe"
544 # tracebacks in the queue's own worker thread. But we detect killed
545 # processes anyway, so silence the tracebacks.
546 self._call_queue._ignore_epipe = True
547 self._result_queue = mp_context.SimpleQueue()
548 self._work_ids = queue.Queue()
549
550 # _ThreadWakeup is a communication channel used to interrupt the wait
551 # of the main loop of queue_manager_thread from another thread (e.g.
552 # when calling executor.submit or executor.shutdown). We do not use the
553 # _result_queue to send the wakeup signal to the queue_manager_thread
554 # as it could result in a deadlock if a worker process dies with the
555 # _result_queue write lock still acquired.
556 self._queue_management_thread_wakeup = _ThreadWakeup()
557
Brian Quinlan81c4d362010-09-18 22:35:02 +0000558 def _start_queue_management_thread(self):
559 if self._queue_management_thread is None:
Thomas Moreau94459fd2018-01-05 11:15:54 +0100560 # When the executor gets garbarge collected, the weakref callback
561 # will wake up the queue management thread so that it can terminate
562 # if there is no pending work item.
563 def weakref_cb(_,
564 thread_wakeup=self._queue_management_thread_wakeup):
565 mp.util.debug('Executor collected: triggering callback for'
566 ' QueueManager wakeup')
567 thread_wakeup.wakeup()
Antoine Pitroudd696492011-06-08 17:21:55 +0200568 # Start the processes so that their sentinels are known.
569 self._adjust_process_count()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000570 self._queue_management_thread = threading.Thread(
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200571 target=_queue_management_worker,
572 args=(weakref.ref(self, weakref_cb),
573 self._processes,
574 self._pending_work_items,
575 self._work_ids,
576 self._call_queue,
Thomas Moreau94459fd2018-01-05 11:15:54 +0100577 self._result_queue,
578 self._queue_management_thread_wakeup),
579 name="QueueManagerThread")
Brian Quinlan81c4d362010-09-18 22:35:02 +0000580 self._queue_management_thread.daemon = True
581 self._queue_management_thread.start()
Thomas Moreau94459fd2018-01-05 11:15:54 +0100582 _threads_wakeups[self._queue_management_thread] = \
583 self._queue_management_thread_wakeup
Brian Quinlan81c4d362010-09-18 22:35:02 +0000584
585 def _adjust_process_count(self):
586 for _ in range(len(self._processes), self._max_workers):
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200587 p = self._mp_context.Process(
588 target=_process_worker,
589 args=(self._call_queue,
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100590 self._result_queue,
591 self._initializer,
592 self._initargs))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000593 p.start()
Antoine Pitroudd696492011-06-08 17:21:55 +0200594 self._processes[p.pid] = p
Brian Quinlan81c4d362010-09-18 22:35:02 +0000595
596 def submit(self, fn, *args, **kwargs):
597 with self._shutdown_lock:
Antoine Pitroudd696492011-06-08 17:21:55 +0200598 if self._broken:
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100599 raise BrokenProcessPool(self._broken)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000600 if self._shutdown_thread:
601 raise RuntimeError('cannot schedule new futures after shutdown')
Mark Nemecc4b695f2018-04-10 18:23:14 +0100602 if _global_shutdown:
603 raise RuntimeError('cannot schedule new futures after '
604 'interpreter shutdown')
Brian Quinlan81c4d362010-09-18 22:35:02 +0000605
606 f = _base.Future()
607 w = _WorkItem(f, fn, args, kwargs)
608
609 self._pending_work_items[self._queue_count] = w
610 self._work_ids.put(self._queue_count)
611 self._queue_count += 1
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100612 # Wake up queue management thread
Thomas Moreau94459fd2018-01-05 11:15:54 +0100613 self._queue_management_thread_wakeup.wakeup()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000614
615 self._start_queue_management_thread()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000616 return f
617 submit.__doc__ = _base.Executor.submit.__doc__
618
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200619 def map(self, fn, *iterables, timeout=None, chunksize=1):
Martin Panterd2ad5712015-11-02 04:20:33 +0000620 """Returns an iterator equivalent to map(fn, iter).
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200621
622 Args:
623 fn: A callable that will take as many arguments as there are
624 passed iterables.
625 timeout: The maximum number of seconds to wait. If None, then there
626 is no limit on the wait time.
627 chunksize: If greater than one, the iterables will be chopped into
628 chunks of size chunksize and submitted to the process pool.
629 If set to one, the items in the list will be sent one at a time.
630
631 Returns:
632 An iterator equivalent to: map(func, *iterables) but the calls may
633 be evaluated out-of-order.
634
635 Raises:
636 TimeoutError: If the entire result iterator could not be generated
637 before the given timeout.
638 Exception: If fn(*args) raises for any values.
639 """
640 if chunksize < 1:
641 raise ValueError("chunksize must be >= 1.")
642
643 results = super().map(partial(_process_chunk, fn),
644 _get_chunks(*iterables, chunksize=chunksize),
645 timeout=timeout)
Grzegorz Grzywacz97e1b1c2017-09-01 18:54:00 +0200646 return _chain_from_iterable_of_lists(results)
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200647
Brian Quinlan81c4d362010-09-18 22:35:02 +0000648 def shutdown(self, wait=True):
649 with self._shutdown_lock:
650 self._shutdown_thread = True
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100651 if self._queue_management_thread:
652 # Wake up queue management thread
Thomas Moreau94459fd2018-01-05 11:15:54 +0100653 self._queue_management_thread_wakeup.wakeup()
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100654 if wait:
Brian Quinlan81c4d362010-09-18 22:35:02 +0000655 self._queue_management_thread.join()
Ezio Melottib5bc3532013-08-17 16:11:40 +0300656 # To reduce the risk of opening too many files, remove references to
Brian Quinlan81c4d362010-09-18 22:35:02 +0000657 # objects that use file descriptors.
658 self._queue_management_thread = None
Victor Stinnerb713adf2017-09-02 00:25:11 +0200659 if self._call_queue is not None:
660 self._call_queue.close()
661 if wait:
662 self._call_queue.join_thread()
663 self._call_queue = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000664 self._result_queue = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000665 self._processes = None
Thomas Moreau095ee412018-03-12 18:18:41 +0100666
667 if self._queue_management_thread_wakeup:
668 self._queue_management_thread_wakeup.close()
669 self._queue_management_thread_wakeup = None
670
Brian Quinlan81c4d362010-09-18 22:35:02 +0000671 shutdown.__doc__ = _base.Executor.shutdown.__doc__
672
673atexit.register(_python_exit)