blob: 36355ae8756dbca67121fd55bdd2ba6b15e2491d [file] [log] [blame]
Brian Quinlan81c4d362010-09-18 22:35:02 +00001# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4"""Implements ProcessPoolExecutor.
5
Thomas Graingerf938d8b2019-04-12 17:17:17 +01006The following diagram and text describe the data-flow through the system:
Brian Quinlan81c4d362010-09-18 22:35:02 +00007
8|======================= In-process =====================|== Out-of-process ==|
9
10+----------+ +----------+ +--------+ +-----------+ +---------+
Thomas Moreau94459fd2018-01-05 11:15:54 +010011| | => | Work Ids | | | | Call Q | | Process |
12| | +----------+ | | +-----------+ | Pool |
13| | | ... | | | | ... | +---------+
14| | | 6 | => | | => | 5, call() | => | |
Brian Quinlan81c4d362010-09-18 22:35:02 +000015| | | 7 | | | | ... | | |
16| Process | | ... | | Local | +-----------+ | Process |
17| Pool | +----------+ | Worker | | #1..n |
18| Executor | | Thread | | |
19| | +----------- + | | +-----------+ | |
20| | <=> | Work Items | <=> | | <= | Result Q | <= | |
21| | +------------+ | | +-----------+ | |
22| | | 6: call() | | | | ... | | |
23| | | future | | | | 4, result | | |
24| | | ... | | | | 3, except | | |
25+----------+ +------------+ +--------+ +-----------+ +---------+
26
27Executor.submit() called:
28- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29- adds the id of the _WorkItem to the "Work Ids" queue
30
31Local worker thread:
32- reads work ids from the "Work Ids" queue and looks up the corresponding
33 WorkItem from the "Work Items" dict: if the work item has been cancelled then
34 it is simply removed from the dict, otherwise it is repackaged as a
35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38- reads _ResultItems from "Result Q", updates the future stored in the
39 "Work Items" dict and deletes the dict entry
40
41Process #1..n:
42- reads _CallItems from "Call Q", executes the calls, and puts the resulting
Mark Dickinson5ee24042012-10-20 13:16:49 +010043 _ResultItems in "Result Q"
Brian Quinlan81c4d362010-09-18 22:35:02 +000044"""
45
46__author__ = 'Brian Quinlan (brian@sweetapp.com)'
47
48import atexit
Antoine Pitroudd696492011-06-08 17:21:55 +020049import os
Brian Quinlan81c4d362010-09-18 22:35:02 +000050from concurrent.futures import _base
51import queue
Thomas Moreaue8c368d2017-10-03 11:53:17 +020052import multiprocessing as mp
Brian Quinlanf7bda5c2019-05-07 13:31:11 -040053import multiprocessing.connection
Thomas Moreau94459fd2018-01-05 11:15:54 +010054from multiprocessing.queues import Queue
Brian Quinlan81c4d362010-09-18 22:35:02 +000055import threading
56import weakref
Antoine Pitrou4aae2762014-10-04 20:20:10 +020057from functools import partial
58import itertools
Brian Quinlan39889862019-05-08 14:04:53 -040059import sys
Antoine Pitrou1285c9b2015-01-17 20:02:14 +010060import traceback
Brian Quinlan81c4d362010-09-18 22:35:02 +000061
Brian Quinlan81c4d362010-09-18 22:35:02 +000062
Thomas Moreau94459fd2018-01-05 11:15:54 +010063_threads_wakeups = weakref.WeakKeyDictionary()
Thomas Moreaue8c368d2017-10-03 11:53:17 +020064_global_shutdown = False
Brian Quinlan81c4d362010-09-18 22:35:02 +000065
Thomas Moreau94459fd2018-01-05 11:15:54 +010066
67class _ThreadWakeup:
Thomas Moreau94459fd2018-01-05 11:15:54 +010068 def __init__(self):
Thomas Moreaua5cbab52020-02-16 19:09:26 +010069 self._closed = False
Thomas Moreau94459fd2018-01-05 11:15:54 +010070 self._reader, self._writer = mp.Pipe(duplex=False)
71
Thomas Moreau095ee412018-03-12 18:18:41 +010072 def close(self):
Thomas Moreaua5cbab52020-02-16 19:09:26 +010073 if not self._closed:
74 self._closed = True
75 self._writer.close()
76 self._reader.close()
Thomas Moreau095ee412018-03-12 18:18:41 +010077
Thomas Moreau94459fd2018-01-05 11:15:54 +010078 def wakeup(self):
Thomas Moreaua5cbab52020-02-16 19:09:26 +010079 if not self._closed:
80 self._writer.send_bytes(b"")
Thomas Moreau94459fd2018-01-05 11:15:54 +010081
82 def clear(self):
Thomas Moreaua5cbab52020-02-16 19:09:26 +010083 if not self._closed:
84 while self._reader.poll():
85 self._reader.recv_bytes()
Thomas Moreau94459fd2018-01-05 11:15:54 +010086
87
Brian Quinlan81c4d362010-09-18 22:35:02 +000088def _python_exit():
Thomas Moreaue8c368d2017-10-03 11:53:17 +020089 global _global_shutdown
90 _global_shutdown = True
Thomas Moreau94459fd2018-01-05 11:15:54 +010091 items = list(_threads_wakeups.items())
92 for _, thread_wakeup in items:
93 thread_wakeup.wakeup()
94 for t, _ in items:
Antoine Pitrouc13d4542011-03-26 19:29:44 +010095 t.join()
Brian Quinlan81c4d362010-09-18 22:35:02 +000096
Kyle Stanleyb61b8182020-03-27 15:31:22 -040097# Register for `_python_exit()` to be called just before joining all
98# non-daemon threads. This is used instead of `atexit.register()` for
99# compatibility with subinterpreters, which no longer support daemon threads.
100# See bpo-39812 for context.
101threading._register_atexit(_python_exit)
102
Brian Quinlan81c4d362010-09-18 22:35:02 +0000103# Controls how many more calls than processes will be queued in the call queue.
104# A smaller number will mean that processes spend more time idle waiting for
105# work while a larger number will make Future.cancel() succeed less frequently
106# (Futures in the call queue cannot be cancelled).
107EXTRA_QUEUED_CALLS = 1
108
Thomas Moreau94459fd2018-01-05 11:15:54 +0100109
Brian Quinlan39889862019-05-08 14:04:53 -0400110# On Windows, WaitForMultipleObjects is used to wait for processes to finish.
111# It can wait on, at most, 63 objects. There is an overhead of two objects:
112# - the result queue reader
113# - the thread wakeup reader
114_MAX_WINDOWS_WORKERS = 63 - 2
115
Antoine Pitrou1285c9b2015-01-17 20:02:14 +0100116# Hack to embed stringification of remote traceback in local traceback
117
118class _RemoteTraceback(Exception):
119 def __init__(self, tb):
120 self.tb = tb
121 def __str__(self):
122 return self.tb
123
124class _ExceptionWithTraceback:
125 def __init__(self, exc, tb):
126 tb = traceback.format_exception(type(exc), exc, tb)
127 tb = ''.join(tb)
128 self.exc = exc
129 self.tb = '\n"""\n%s"""' % tb
130 def __reduce__(self):
131 return _rebuild_exc, (self.exc, self.tb)
132
133def _rebuild_exc(exc, tb):
134 exc.__cause__ = _RemoteTraceback(tb)
135 return exc
136
Brian Quinlan81c4d362010-09-18 22:35:02 +0000137class _WorkItem(object):
138 def __init__(self, future, fn, args, kwargs):
139 self.future = future
140 self.fn = fn
141 self.args = args
142 self.kwargs = kwargs
143
144class _ResultItem(object):
145 def __init__(self, work_id, exception=None, result=None):
146 self.work_id = work_id
147 self.exception = exception
148 self.result = result
149
150class _CallItem(object):
151 def __init__(self, work_id, fn, args, kwargs):
152 self.work_id = work_id
153 self.fn = fn
154 self.args = args
155 self.kwargs = kwargs
156
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100157
Thomas Moreau94459fd2018-01-05 11:15:54 +0100158class _SafeQueue(Queue):
159 """Safe Queue set exception to the future object linked to a job"""
Thomas Moreaua5cbab52020-02-16 19:09:26 +0100160 def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup):
Thomas Moreau94459fd2018-01-05 11:15:54 +0100161 self.pending_work_items = pending_work_items
Thomas Moreaua5cbab52020-02-16 19:09:26 +0100162 self.thread_wakeup = thread_wakeup
Thomas Moreau94459fd2018-01-05 11:15:54 +0100163 super().__init__(max_size, ctx=ctx)
164
165 def _on_queue_feeder_error(self, e, obj):
166 if isinstance(obj, _CallItem):
167 tb = traceback.format_exception(type(e), e, e.__traceback__)
168 e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
169 work_item = self.pending_work_items.pop(obj.work_id, None)
Thomas Moreaua5cbab52020-02-16 19:09:26 +0100170 self.thread_wakeup.wakeup()
Thomas Moreau0e890762020-03-01 21:49:14 +0100171 # work_item can be None if another process terminated. In this
172 # case, the executor_manager_thread fails all work_items
173 # with BrokenProcessPool
Thomas Moreau94459fd2018-01-05 11:15:54 +0100174 if work_item is not None:
175 work_item.future.set_exception(e)
176 else:
177 super()._on_queue_feeder_error(e, obj)
178
179
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200180def _get_chunks(*iterables, chunksize):
181 """ Iterates over zip()ed iterables in chunks. """
182 it = zip(*iterables)
183 while True:
184 chunk = tuple(itertools.islice(it, chunksize))
185 if not chunk:
186 return
187 yield chunk
188
Thomas Moreau0e890762020-03-01 21:49:14 +0100189
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200190def _process_chunk(fn, chunk):
191 """ Processes a chunk of an iterable passed to map.
192
193 Runs the function passed to map() on a chunk of the
194 iterable passed to map.
195
196 This function is run in a separate process.
197
198 """
199 return [fn(*args) for args in chunk]
200
Thomas Moreau94459fd2018-01-05 11:15:54 +0100201
202def _sendback_result(result_queue, work_id, result=None, exception=None):
203 """Safely send back the given result or exception"""
204 try:
205 result_queue.put(_ResultItem(work_id, result=result,
206 exception=exception))
207 except BaseException as e:
208 exc = _ExceptionWithTraceback(e, e.__traceback__)
209 result_queue.put(_ResultItem(work_id, exception=exc))
210
211
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100212def _process_worker(call_queue, result_queue, initializer, initargs):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000213 """Evaluates calls from call_queue and places the results in result_queue.
214
Georg Brandlfb1720b2010-12-09 18:08:43 +0000215 This worker is run in a separate process.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000216
217 Args:
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200218 call_queue: A ctx.Queue of _CallItems that will be read and
Brian Quinlan81c4d362010-09-18 22:35:02 +0000219 evaluated by the worker.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200220 result_queue: A ctx.Queue of _ResultItems that will written
Brian Quinlan81c4d362010-09-18 22:35:02 +0000221 to by the worker.
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100222 initializer: A callable initializer, or None
223 initargs: A tuple of args for the initializer
Brian Quinlan81c4d362010-09-18 22:35:02 +0000224 """
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100225 if initializer is not None:
226 try:
227 initializer(*initargs)
228 except BaseException:
229 _base.LOGGER.critical('Exception in initializer:', exc_info=True)
230 # The parent will notice that the process stopped and
231 # mark the pool broken
232 return
Brian Quinlan81c4d362010-09-18 22:35:02 +0000233 while True:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200234 call_item = call_queue.get(block=True)
235 if call_item is None:
236 # Wake up queue management thread
Antoine Pitroudd696492011-06-08 17:21:55 +0200237 result_queue.put(os.getpid())
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200238 return
Brian Quinlan81c4d362010-09-18 22:35:02 +0000239 try:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200240 r = call_item.fn(*call_item.args, **call_item.kwargs)
241 except BaseException as e:
Antoine Pitrou1285c9b2015-01-17 20:02:14 +0100242 exc = _ExceptionWithTraceback(e, e.__traceback__)
Thomas Moreau94459fd2018-01-05 11:15:54 +0100243 _sendback_result(result_queue, call_item.work_id, exception=exc)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000244 else:
Thomas Moreau94459fd2018-01-05 11:15:54 +0100245 _sendback_result(result_queue, call_item.work_id, result=r)
Dave Chevell962bdea2019-03-17 09:28:51 +1100246 del r
Brian Quinlan81c4d362010-09-18 22:35:02 +0000247
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200248 # Liberate the resource as soon as possible, to avoid holding onto
249 # open files or shared memory that is not needed anymore
250 del call_item
251
252
Thomas Moreau0e890762020-03-01 21:49:14 +0100253class _ExecutorManagerThread(threading.Thread):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000254 """Manages the communication between this process and the worker processes.
255
Thomas Moreau0e890762020-03-01 21:49:14 +0100256 The manager is run in a local thread.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000257
258 Args:
Thomas Moreau0e890762020-03-01 21:49:14 +0100259 executor: A reference to the ProcessPoolExecutor that owns
260 this thread. A weakref will be own by the manager as well as
261 references to internal objects used to introspect the state of
262 the executor.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000263 """
Antoine Pitrou020436b2011-07-02 21:20:25 +0200264
Thomas Moreau0e890762020-03-01 21:49:14 +0100265 def __init__(self, executor):
266 # Store references to necessary internals of the executor.
Antoine Pitroudd696492011-06-08 17:21:55 +0200267
Thomas Moreau0e890762020-03-01 21:49:14 +0100268 # A _ThreadWakeup to allow waking up the queue_manager_thread from the
269 # main Thread and avoid deadlocks caused by permanently locked queues.
270 self.thread_wakeup = executor._executor_manager_thread_wakeup
Thomas Moreau94459fd2018-01-05 11:15:54 +0100271
Thomas Moreau0e890762020-03-01 21:49:14 +0100272 # A weakref.ref to the ProcessPoolExecutor that owns this thread. Used
273 # to determine if the ProcessPoolExecutor has been garbage collected
274 # and that the manager can exit.
275 # When the executor gets garbage collected, the weakref callback
276 # will wake up the queue management thread so that it can terminate
277 # if there is no pending work item.
278 def weakref_cb(_, thread_wakeup=self.thread_wakeup):
279 mp.util.debug('Executor collected: triggering callback for'
280 ' QueueManager wakeup')
281 thread_wakeup.wakeup()
Antoine Pitroudd696492011-06-08 17:21:55 +0200282
Thomas Moreau0e890762020-03-01 21:49:14 +0100283 self.executor_reference = weakref.ref(executor, weakref_cb)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +0100284
Thomas Moreau0e890762020-03-01 21:49:14 +0100285 # A list of the ctx.Process instances used as workers.
286 self.processes = executor._processes
Brian Quinlan81c4d362010-09-18 22:35:02 +0000287
Thomas Moreau0e890762020-03-01 21:49:14 +0100288 # A ctx.Queue that will be filled with _CallItems derived from
289 # _WorkItems for processing by the process workers.
290 self.call_queue = executor._call_queue
291
292 # A ctx.SimpleQueue of _ResultItems generated by the process workers.
293 self.result_queue = executor._result_queue
294
295 # A queue.Queue of work ids e.g. Queue([5, 6, ...]).
296 self.work_ids_queue = executor._work_ids
297
298 # A dict mapping work ids to _WorkItems e.g.
299 # {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
300 self.pending_work_items = executor._pending_work_items
301
Thomas Moreau0e890762020-03-01 21:49:14 +0100302 super().__init__()
Thomas Moreau0e890762020-03-01 21:49:14 +0100303
304 def run(self):
305 # Main loop for the executor manager thread.
306
307 while True:
308 self.add_call_item_to_queue()
309
310 result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
311
312 if is_broken:
313 self.terminate_broken(cause)
314 return
315 if result_item is not None:
316 self.process_result_item(result_item)
317 # Delete reference to result_item to avoid keeping references
318 # while waiting on new results.
319 del result_item
320
Kyle Stanley1ac6e372020-04-19 10:00:59 -0400321 # attempt to increment idle process count
322 executor = self.executor_reference()
323 if executor is not None:
324 executor._idle_worker_semaphore.release()
325 del executor
326
Thomas Moreau0e890762020-03-01 21:49:14 +0100327 if self.is_shutting_down():
328 self.flag_executor_shutting_down()
329
330 # Since no new work items can be added, it is safe to shutdown
331 # this thread if there are no pending work items.
332 if not self.pending_work_items:
333 self.join_executor_internals()
334 return
335
336 def add_call_item_to_queue(self):
337 # Fills call_queue with _WorkItems from pending_work_items.
338 # This function never blocks.
339 while True:
340 if self.call_queue.full():
341 return
342 try:
343 work_id = self.work_ids_queue.get(block=False)
344 except queue.Empty:
345 return
346 else:
347 work_item = self.pending_work_items[work_id]
348
349 if work_item.future.set_running_or_notify_cancel():
350 self.call_queue.put(_CallItem(work_id,
351 work_item.fn,
352 work_item.args,
353 work_item.kwargs),
354 block=True)
355 else:
356 del self.pending_work_items[work_id]
357 continue
358
359 def wait_result_broken_or_wakeup(self):
Thomas Moreau94459fd2018-01-05 11:15:54 +0100360 # Wait for a result to be ready in the result_queue while checking
361 # that all worker processes are still running, or for a wake up
362 # signal send. The wake up signals come either from new tasks being
363 # submitted, from the executor being shutdown/gc-ed, or from the
364 # shutdown of the python interpreter.
Thomas Moreau0e890762020-03-01 21:49:14 +0100365 result_reader = self.result_queue._reader
366 wakeup_reader = self.thread_wakeup._reader
367 readers = [result_reader, wakeup_reader]
368 worker_sentinels = [p.sentinel for p in self.processes.values()]
Brian Quinlanf7bda5c2019-05-07 13:31:11 -0400369 ready = mp.connection.wait(readers + worker_sentinels)
Thomas Moreau94459fd2018-01-05 11:15:54 +0100370
371 cause = None
372 is_broken = True
Thomas Moreau0e890762020-03-01 21:49:14 +0100373 result_item = None
Thomas Moreau94459fd2018-01-05 11:15:54 +0100374 if result_reader in ready:
375 try:
376 result_item = result_reader.recv()
377 is_broken = False
378 except BaseException as e:
379 cause = traceback.format_exception(type(e), e, e.__traceback__)
380
381 elif wakeup_reader in ready:
382 is_broken = False
Thomas Moreau0e890762020-03-01 21:49:14 +0100383 self.thread_wakeup.clear()
384
385 return result_item, is_broken, cause
386
387 def process_result_item(self, result_item):
388 # Process the received a result_item. This can be either the PID of a
389 # worker that exited gracefully or a _ResultItem
390
Antoine Pitroudd696492011-06-08 17:21:55 +0200391 if isinstance(result_item, int):
392 # Clean shutdown of a worker using its PID
393 # (avoids marking the executor broken)
Thomas Moreau0e890762020-03-01 21:49:14 +0100394 assert self.is_shutting_down()
395 p = self.processes.pop(result_item)
Antoine Pitroud06a0652011-07-16 01:13:34 +0200396 p.join()
Thomas Moreau0e890762020-03-01 21:49:14 +0100397 if not self.processes:
398 self.join_executor_internals()
Antoine Pitrou020436b2011-07-02 21:20:25 +0200399 return
Thomas Moreau0e890762020-03-01 21:49:14 +0100400 else:
401 # Received a _ResultItem so mark the future as completed.
402 work_item = self.pending_work_items.pop(result_item.work_id, None)
Antoine Pitroudd696492011-06-08 17:21:55 +0200403 # work_item can be None if another process terminated (see above)
404 if work_item is not None:
405 if result_item.exception:
406 work_item.future.set_exception(result_item.exception)
407 else:
408 work_item.future.set_result(result_item.result)
Thomas Moreau94459fd2018-01-05 11:15:54 +0100409
Thomas Moreau0e890762020-03-01 21:49:14 +0100410 def is_shutting_down(self):
411 # Check whether we should start shutting down the executor.
412 executor = self.executor_reference()
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100413 # No more work items can be added if:
414 # - The interpreter is shutting down OR
415 # - The executor that owns this worker has been collected OR
416 # - The executor that owns this worker has been shutdown.
Thomas Moreau0e890762020-03-01 21:49:14 +0100417 return (_global_shutdown or executor is None
418 or executor._shutdown_thread)
Kyle Stanley339fd462020-02-02 07:49:00 -0500419
Thomas Moreau0e890762020-03-01 21:49:14 +0100420 def terminate_broken(self, cause):
421 # Terminate the executor because it is in a broken state. The cause
422 # argument can be used to display more information on the error that
423 # lead the executor into becoming broken.
Kyle Stanley339fd462020-02-02 07:49:00 -0500424
Thomas Moreau0e890762020-03-01 21:49:14 +0100425 # Mark the process pool broken so that submits fail right now.
426 executor = self.executor_reference()
427 if executor is not None:
428 executor._broken = ('A child process terminated '
429 'abruptly, the process pool is not '
430 'usable anymore')
431 executor._shutdown_thread = True
432 executor = None
433
434 # All pending tasks are to be marked failed with the following
435 # BrokenProcessPool error
436 bpe = BrokenProcessPool("A process in the process pool was "
437 "terminated abruptly while the future was "
438 "running or pending.")
439 if cause is not None:
440 bpe.__cause__ = _RemoteTraceback(
441 f"\n'''\n{''.join(cause)}'''")
442
443 # Mark pending tasks as failed.
444 for work_id, work_item in self.pending_work_items.items():
445 work_item.future.set_exception(bpe)
446 # Delete references to object. See issue16284
447 del work_item
448 self.pending_work_items.clear()
449
450 # Terminate remaining workers forcibly: the queues or their
451 # locks may be in a dirty state and block forever.
452 for p in self.processes.values():
453 p.terminate()
454
455 # clean up resources
456 self.join_executor_internals()
457
458 def flag_executor_shutting_down(self):
459 # Flag the executor as shutting down and cancel remaining tasks if
460 # requested as early as possible if it is not gc-ed yet.
461 executor = self.executor_reference()
462 if executor is not None:
463 executor._shutdown_thread = True
464 # Cancel pending work items if requested.
465 if executor._cancel_pending_futures:
466 # Cancel all pending futures and update pending_work_items
467 # to only have futures that are currently running.
468 new_pending_work_items = {}
469 for work_id, work_item in self.pending_work_items.items():
470 if not work_item.future.cancel():
471 new_pending_work_items[work_id] = work_item
472 self.pending_work_items = new_pending_work_items
473 # Drain work_ids_queue since we no longer need to
474 # add items to the call queue.
475 while True:
476 try:
477 self.work_ids_queue.get_nowait()
478 except queue.Empty:
479 break
480 # Make sure we do this only once to not waste time looping
481 # on running processes over and over.
482 executor._cancel_pending_futures = False
483
484 def shutdown_workers(self):
485 n_children_to_stop = self.get_n_children_alive()
486 n_sentinels_sent = 0
487 # Send the right number of sentinels, to make sure all children are
488 # properly terminated.
489 while (n_sentinels_sent < n_children_to_stop
490 and self.get_n_children_alive() > 0):
491 for i in range(n_children_to_stop - n_sentinels_sent):
492 try:
493 self.call_queue.put_nowait(None)
494 n_sentinels_sent += 1
495 except queue.Full:
496 break
497
498 def join_executor_internals(self):
499 self.shutdown_workers()
500 # Release the queue's resources as soon as possible.
501 self.call_queue.close()
502 self.call_queue.join_thread()
503 self.thread_wakeup.close()
504 # If .join() is not called on the created processes then
505 # some ctx.Queue methods may deadlock on Mac OS X.
506 for p in self.processes.values():
507 p.join()
508
509 def get_n_children_alive(self):
510 # This is an upper bound on the number of children alive.
511 return sum(p.is_alive() for p in self.processes.values())
Brian Quinlan81c4d362010-09-18 22:35:02 +0000512
Thomas Moreau94459fd2018-01-05 11:15:54 +0100513
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000514_system_limits_checked = False
515_system_limited = None
Thomas Moreau94459fd2018-01-05 11:15:54 +0100516
517
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000518def _check_system_limits():
519 global _system_limits_checked, _system_limited
520 if _system_limits_checked:
521 if _system_limited:
522 raise NotImplementedError(_system_limited)
523 _system_limits_checked = True
524 try:
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000525 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
526 except (AttributeError, ValueError):
527 # sysconf not available or setting not available
528 return
529 if nsems_max == -1:
Ezio Melottib5bc3532013-08-17 16:11:40 +0300530 # indetermined limit, assume that limit is determined
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000531 # by available memory only
532 return
533 if nsems_max >= 256:
534 # minimum number of semaphores available
535 # according to POSIX
536 return
Thomas Moreau94459fd2018-01-05 11:15:54 +0100537 _system_limited = ("system provides too few semaphores (%d"
538 " available, 256 necessary)" % nsems_max)
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000539 raise NotImplementedError(_system_limited)
540
Antoine Pitroudd696492011-06-08 17:21:55 +0200541
Grzegorz Grzywacz97e1b1c2017-09-01 18:54:00 +0200542def _chain_from_iterable_of_lists(iterable):
543 """
544 Specialized implementation of itertools.chain.from_iterable.
545 Each item in *iterable* should be a list. This function is
546 careful not to keep references to yielded objects.
547 """
548 for element in iterable:
549 element.reverse()
550 while element:
551 yield element.pop()
552
553
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100554class BrokenProcessPool(_base.BrokenExecutor):
Antoine Pitroudd696492011-06-08 17:21:55 +0200555 """
556 Raised when a process in a ProcessPoolExecutor terminated abruptly
557 while a future was in the running state.
558 """
559
560
Brian Quinlan81c4d362010-09-18 22:35:02 +0000561class ProcessPoolExecutor(_base.Executor):
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100562 def __init__(self, max_workers=None, mp_context=None,
563 initializer=None, initargs=()):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000564 """Initializes a new ProcessPoolExecutor instance.
565
566 Args:
567 max_workers: The maximum number of processes that can be used to
568 execute the given calls. If None or not given then as many
569 worker processes will be created as the machine has processors.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200570 mp_context: A multiprocessing context to launch the workers. This
571 object should provide SimpleQueue, Queue and Process.
ubordignon552ace72019-06-15 13:43:10 +0200572 initializer: A callable used to initialize worker processes.
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100573 initargs: A tuple of arguments to pass to the initializer.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000574 """
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000575 _check_system_limits()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000576
577 if max_workers is None:
Charles-François Natali37cfb0a2013-06-28 19:25:45 +0200578 self._max_workers = os.cpu_count() or 1
Brian Quinlan39889862019-05-08 14:04:53 -0400579 if sys.platform == 'win32':
580 self._max_workers = min(_MAX_WINDOWS_WORKERS,
581 self._max_workers)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000582 else:
Brian Quinlan20efceb2014-05-17 13:51:10 -0700583 if max_workers <= 0:
584 raise ValueError("max_workers must be greater than 0")
Brian Quinlan39889862019-05-08 14:04:53 -0400585 elif (sys.platform == 'win32' and
586 max_workers > _MAX_WINDOWS_WORKERS):
587 raise ValueError(
588 f"max_workers must be <= {_MAX_WINDOWS_WORKERS}")
Brian Quinlan20efceb2014-05-17 13:51:10 -0700589
Brian Quinlan81c4d362010-09-18 22:35:02 +0000590 self._max_workers = max_workers
Thomas Moreau94459fd2018-01-05 11:15:54 +0100591
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200592 if mp_context is None:
593 mp_context = mp.get_context()
594 self._mp_context = mp_context
Brian Quinlan81c4d362010-09-18 22:35:02 +0000595
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100596 if initializer is not None and not callable(initializer):
597 raise TypeError("initializer must be a callable")
598 self._initializer = initializer
599 self._initargs = initargs
600
Thomas Moreau94459fd2018-01-05 11:15:54 +0100601 # Management thread
Thomas Moreau0e890762020-03-01 21:49:14 +0100602 self._executor_manager_thread = None
Thomas Moreau94459fd2018-01-05 11:15:54 +0100603
Antoine Pitroudd696492011-06-08 17:21:55 +0200604 # Map of pids to processes
605 self._processes = {}
Brian Quinlan81c4d362010-09-18 22:35:02 +0000606
607 # Shutdown is a two-step process.
608 self._shutdown_thread = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000609 self._shutdown_lock = threading.Lock()
Kyle Stanley1ac6e372020-04-19 10:00:59 -0400610 self._idle_worker_semaphore = threading.Semaphore(0)
Antoine Pitroudd696492011-06-08 17:21:55 +0200611 self._broken = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000612 self._queue_count = 0
613 self._pending_work_items = {}
Kyle Stanley339fd462020-02-02 07:49:00 -0500614 self._cancel_pending_futures = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000615
Thomas Moreau94459fd2018-01-05 11:15:54 +0100616 # _ThreadWakeup is a communication channel used to interrupt the wait
Thomas Moreau0e890762020-03-01 21:49:14 +0100617 # of the main loop of executor_manager_thread from another thread (e.g.
Thomas Moreau94459fd2018-01-05 11:15:54 +0100618 # when calling executor.submit or executor.shutdown). We do not use the
Thomas Moreau0e890762020-03-01 21:49:14 +0100619 # _result_queue to send wakeup signals to the executor_manager_thread
Thomas Moreau94459fd2018-01-05 11:15:54 +0100620 # as it could result in a deadlock if a worker process dies with the
621 # _result_queue write lock still acquired.
Thomas Moreau0e890762020-03-01 21:49:14 +0100622 self._executor_manager_thread_wakeup = _ThreadWakeup()
Thomas Moreau94459fd2018-01-05 11:15:54 +0100623
Thomas Moreaua5cbab52020-02-16 19:09:26 +0100624 # Create communication channels for the executor
625 # Make the call queue slightly larger than the number of processes to
626 # prevent the worker processes from idling. But don't make it too big
627 # because futures in the call queue cannot be cancelled.
628 queue_size = self._max_workers + EXTRA_QUEUED_CALLS
629 self._call_queue = _SafeQueue(
630 max_size=queue_size, ctx=self._mp_context,
631 pending_work_items=self._pending_work_items,
Thomas Moreau0e890762020-03-01 21:49:14 +0100632 thread_wakeup=self._executor_manager_thread_wakeup)
Thomas Moreaua5cbab52020-02-16 19:09:26 +0100633 # Killed worker processes can produce spurious "broken pipe"
634 # tracebacks in the queue's own worker thread. But we detect killed
635 # processes anyway, so silence the tracebacks.
636 self._call_queue._ignore_epipe = True
637 self._result_queue = mp_context.SimpleQueue()
638 self._work_ids = queue.Queue()
639
Thomas Moreau0e890762020-03-01 21:49:14 +0100640 def _start_executor_manager_thread(self):
641 if self._executor_manager_thread is None:
Antoine Pitroudd696492011-06-08 17:21:55 +0200642 # Start the processes so that their sentinels are known.
Thomas Moreau0e890762020-03-01 21:49:14 +0100643 self._executor_manager_thread = _ExecutorManagerThread(self)
644 self._executor_manager_thread.start()
645 _threads_wakeups[self._executor_manager_thread] = \
646 self._executor_manager_thread_wakeup
Brian Quinlan81c4d362010-09-18 22:35:02 +0000647
648 def _adjust_process_count(self):
Kyle Stanley1ac6e372020-04-19 10:00:59 -0400649 # if there's an idle process, we don't need to spawn a new one.
650 if self._idle_worker_semaphore.acquire(blocking=False):
651 return
652
653 process_count = len(self._processes)
654 if process_count < self._max_workers:
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200655 p = self._mp_context.Process(
656 target=_process_worker,
657 args=(self._call_queue,
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100658 self._result_queue,
659 self._initializer,
660 self._initargs))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000661 p.start()
Antoine Pitroudd696492011-06-08 17:21:55 +0200662 self._processes[p.pid] = p
Brian Quinlan81c4d362010-09-18 22:35:02 +0000663
Serhiy Storchaka142566c2019-06-05 18:22:31 +0300664 def submit(self, fn, /, *args, **kwargs):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000665 with self._shutdown_lock:
Antoine Pitroudd696492011-06-08 17:21:55 +0200666 if self._broken:
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100667 raise BrokenProcessPool(self._broken)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000668 if self._shutdown_thread:
669 raise RuntimeError('cannot schedule new futures after shutdown')
Mark Nemecc4b695f2018-04-10 18:23:14 +0100670 if _global_shutdown:
671 raise RuntimeError('cannot schedule new futures after '
672 'interpreter shutdown')
Brian Quinlan81c4d362010-09-18 22:35:02 +0000673
674 f = _base.Future()
675 w = _WorkItem(f, fn, args, kwargs)
676
677 self._pending_work_items[self._queue_count] = w
678 self._work_ids.put(self._queue_count)
679 self._queue_count += 1
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100680 # Wake up queue management thread
Thomas Moreau0e890762020-03-01 21:49:14 +0100681 self._executor_manager_thread_wakeup.wakeup()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000682
Kyle Stanley1ac6e372020-04-19 10:00:59 -0400683 self._adjust_process_count()
Thomas Moreau0e890762020-03-01 21:49:14 +0100684 self._start_executor_manager_thread()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000685 return f
686 submit.__doc__ = _base.Executor.submit.__doc__
687
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200688 def map(self, fn, *iterables, timeout=None, chunksize=1):
Martin Panterd2ad5712015-11-02 04:20:33 +0000689 """Returns an iterator equivalent to map(fn, iter).
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200690
691 Args:
692 fn: A callable that will take as many arguments as there are
693 passed iterables.
694 timeout: The maximum number of seconds to wait. If None, then there
695 is no limit on the wait time.
696 chunksize: If greater than one, the iterables will be chopped into
697 chunks of size chunksize and submitted to the process pool.
698 If set to one, the items in the list will be sent one at a time.
699
700 Returns:
701 An iterator equivalent to: map(func, *iterables) but the calls may
702 be evaluated out-of-order.
703
704 Raises:
705 TimeoutError: If the entire result iterator could not be generated
706 before the given timeout.
707 Exception: If fn(*args) raises for any values.
708 """
709 if chunksize < 1:
710 raise ValueError("chunksize must be >= 1.")
711
712 results = super().map(partial(_process_chunk, fn),
713 _get_chunks(*iterables, chunksize=chunksize),
714 timeout=timeout)
Grzegorz Grzywacz97e1b1c2017-09-01 18:54:00 +0200715 return _chain_from_iterable_of_lists(results)
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200716
Kyle Stanley339fd462020-02-02 07:49:00 -0500717 def shutdown(self, wait=True, *, cancel_futures=False):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000718 with self._shutdown_lock:
Kyle Stanley339fd462020-02-02 07:49:00 -0500719 self._cancel_pending_futures = cancel_futures
Brian Quinlan81c4d362010-09-18 22:35:02 +0000720 self._shutdown_thread = True
Kyle Stanley339fd462020-02-02 07:49:00 -0500721
Thomas Moreau0e890762020-03-01 21:49:14 +0100722 if self._executor_manager_thread:
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100723 # Wake up queue management thread
Thomas Moreau0e890762020-03-01 21:49:14 +0100724 self._executor_manager_thread_wakeup.wakeup()
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100725 if wait:
Thomas Moreau0e890762020-03-01 21:49:14 +0100726 self._executor_manager_thread.join()
Ezio Melottib5bc3532013-08-17 16:11:40 +0300727 # To reduce the risk of opening too many files, remove references to
Brian Quinlan81c4d362010-09-18 22:35:02 +0000728 # objects that use file descriptors.
Thomas Moreau0e890762020-03-01 21:49:14 +0100729 self._executor_manager_thread = None
Thomas Moreaua5cbab52020-02-16 19:09:26 +0100730 self._call_queue = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000731 self._result_queue = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000732 self._processes = None
Thomas Moreau095ee412018-03-12 18:18:41 +0100733
Thomas Moreau0e890762020-03-01 21:49:14 +0100734 if self._executor_manager_thread_wakeup:
735 self._executor_manager_thread_wakeup = None
Thomas Moreau095ee412018-03-12 18:18:41 +0100736
Brian Quinlan81c4d362010-09-18 22:35:02 +0000737 shutdown.__doc__ = _base.Executor.shutdown.__doc__