blob: 39fadcce027c28712e541334af9beacddbcf83fa [file] [log] [blame]
Brian Quinlan81c4d362010-09-18 22:35:02 +00001# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4"""Implements ProcessPoolExecutor.
5
Thomas Graingerf938d8b2019-04-12 17:17:17 +01006The following diagram and text describe the data-flow through the system:
Brian Quinlan81c4d362010-09-18 22:35:02 +00007
8|======================= In-process =====================|== Out-of-process ==|
9
10+----------+ +----------+ +--------+ +-----------+ +---------+
Thomas Moreau94459fd2018-01-05 11:15:54 +010011| | => | Work Ids | | | | Call Q | | Process |
12| | +----------+ | | +-----------+ | Pool |
13| | | ... | | | | ... | +---------+
14| | | 6 | => | | => | 5, call() | => | |
Brian Quinlan81c4d362010-09-18 22:35:02 +000015| | | 7 | | | | ... | | |
16| Process | | ... | | Local | +-----------+ | Process |
17| Pool | +----------+ | Worker | | #1..n |
18| Executor | | Thread | | |
19| | +----------- + | | +-----------+ | |
20| | <=> | Work Items | <=> | | <= | Result Q | <= | |
21| | +------------+ | | +-----------+ | |
22| | | 6: call() | | | | ... | | |
23| | | future | | | | 4, result | | |
24| | | ... | | | | 3, except | | |
25+----------+ +------------+ +--------+ +-----------+ +---------+
26
27Executor.submit() called:
28- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29- adds the id of the _WorkItem to the "Work Ids" queue
30
31Local worker thread:
32- reads work ids from the "Work Ids" queue and looks up the corresponding
33 WorkItem from the "Work Items" dict: if the work item has been cancelled then
34 it is simply removed from the dict, otherwise it is repackaged as a
35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38- reads _ResultItems from "Result Q", updates the future stored in the
39 "Work Items" dict and deletes the dict entry
40
41Process #1..n:
42- reads _CallItems from "Call Q", executes the calls, and puts the resulting
Mark Dickinson5ee24042012-10-20 13:16:49 +010043 _ResultItems in "Result Q"
Brian Quinlan81c4d362010-09-18 22:35:02 +000044"""
45
46__author__ = 'Brian Quinlan (brian@sweetapp.com)'
47
48import atexit
Antoine Pitroudd696492011-06-08 17:21:55 +020049import os
Brian Quinlan81c4d362010-09-18 22:35:02 +000050from concurrent.futures import _base
51import queue
Thomas Moreaue8c368d2017-10-03 11:53:17 +020052import multiprocessing as mp
Brian Quinlanf7bda5c2019-05-07 13:31:11 -040053import multiprocessing.connection
Thomas Moreau94459fd2018-01-05 11:15:54 +010054from multiprocessing.queues import Queue
Brian Quinlan81c4d362010-09-18 22:35:02 +000055import threading
56import weakref
Antoine Pitrou4aae2762014-10-04 20:20:10 +020057from functools import partial
58import itertools
Brian Quinlan39889862019-05-08 14:04:53 -040059import sys
Antoine Pitrou1285c9b2015-01-17 20:02:14 +010060import traceback
Brian Quinlan81c4d362010-09-18 22:35:02 +000061
62# Workers are created as daemon threads and processes. This is done to allow the
63# interpreter to exit when there are still idle processes in a
64# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
65# allowing workers to die with the interpreter has two undesirable properties:
Raymond Hettinger15f44ab2016-08-30 10:47:49 -070066# - The workers would still be running during interpreter shutdown,
Brian Quinlan81c4d362010-09-18 22:35:02 +000067# meaning that they would fail in unpredictable ways.
68# - The workers could be killed while evaluating a work item, which could
69# be bad if the callable being evaluated has external side-effects e.g.
70# writing to a file.
71#
72# To work around this problem, an exit handler is installed which tells the
73# workers to exit when their work queues are empty and then waits until the
74# threads/processes finish.
75
Thomas Moreau94459fd2018-01-05 11:15:54 +010076_threads_wakeups = weakref.WeakKeyDictionary()
Thomas Moreaue8c368d2017-10-03 11:53:17 +020077_global_shutdown = False
Brian Quinlan81c4d362010-09-18 22:35:02 +000078
Thomas Moreau94459fd2018-01-05 11:15:54 +010079
80class _ThreadWakeup:
Thomas Moreau94459fd2018-01-05 11:15:54 +010081 def __init__(self):
Thomas Moreaua5cbab52020-02-16 19:09:26 +010082 self._closed = False
Thomas Moreau94459fd2018-01-05 11:15:54 +010083 self._reader, self._writer = mp.Pipe(duplex=False)
84
Thomas Moreau095ee412018-03-12 18:18:41 +010085 def close(self):
Thomas Moreaua5cbab52020-02-16 19:09:26 +010086 if not self._closed:
87 self._closed = True
88 self._writer.close()
89 self._reader.close()
Thomas Moreau095ee412018-03-12 18:18:41 +010090
Thomas Moreau94459fd2018-01-05 11:15:54 +010091 def wakeup(self):
Thomas Moreaua5cbab52020-02-16 19:09:26 +010092 if not self._closed:
93 self._writer.send_bytes(b"")
Thomas Moreau94459fd2018-01-05 11:15:54 +010094
95 def clear(self):
Thomas Moreaua5cbab52020-02-16 19:09:26 +010096 if not self._closed:
97 while self._reader.poll():
98 self._reader.recv_bytes()
Thomas Moreau94459fd2018-01-05 11:15:54 +010099
100
Brian Quinlan81c4d362010-09-18 22:35:02 +0000101def _python_exit():
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200102 global _global_shutdown
103 _global_shutdown = True
Thomas Moreau94459fd2018-01-05 11:15:54 +0100104 items = list(_threads_wakeups.items())
105 for _, thread_wakeup in items:
106 thread_wakeup.wakeup()
107 for t, _ in items:
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100108 t.join()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000109
110# Controls how many more calls than processes will be queued in the call queue.
111# A smaller number will mean that processes spend more time idle waiting for
112# work while a larger number will make Future.cancel() succeed less frequently
113# (Futures in the call queue cannot be cancelled).
114EXTRA_QUEUED_CALLS = 1
115
Thomas Moreau94459fd2018-01-05 11:15:54 +0100116
Brian Quinlan39889862019-05-08 14:04:53 -0400117# On Windows, WaitForMultipleObjects is used to wait for processes to finish.
118# It can wait on, at most, 63 objects. There is an overhead of two objects:
119# - the result queue reader
120# - the thread wakeup reader
121_MAX_WINDOWS_WORKERS = 63 - 2
122
Antoine Pitrou1285c9b2015-01-17 20:02:14 +0100123# Hack to embed stringification of remote traceback in local traceback
124
125class _RemoteTraceback(Exception):
126 def __init__(self, tb):
127 self.tb = tb
128 def __str__(self):
129 return self.tb
130
131class _ExceptionWithTraceback:
132 def __init__(self, exc, tb):
133 tb = traceback.format_exception(type(exc), exc, tb)
134 tb = ''.join(tb)
135 self.exc = exc
136 self.tb = '\n"""\n%s"""' % tb
137 def __reduce__(self):
138 return _rebuild_exc, (self.exc, self.tb)
139
140def _rebuild_exc(exc, tb):
141 exc.__cause__ = _RemoteTraceback(tb)
142 return exc
143
Brian Quinlan81c4d362010-09-18 22:35:02 +0000144class _WorkItem(object):
145 def __init__(self, future, fn, args, kwargs):
146 self.future = future
147 self.fn = fn
148 self.args = args
149 self.kwargs = kwargs
150
151class _ResultItem(object):
152 def __init__(self, work_id, exception=None, result=None):
153 self.work_id = work_id
154 self.exception = exception
155 self.result = result
156
157class _CallItem(object):
158 def __init__(self, work_id, fn, args, kwargs):
159 self.work_id = work_id
160 self.fn = fn
161 self.args = args
162 self.kwargs = kwargs
163
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100164
Thomas Moreau94459fd2018-01-05 11:15:54 +0100165class _SafeQueue(Queue):
166 """Safe Queue set exception to the future object linked to a job"""
Thomas Moreaua5cbab52020-02-16 19:09:26 +0100167 def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup):
Thomas Moreau94459fd2018-01-05 11:15:54 +0100168 self.pending_work_items = pending_work_items
Thomas Moreaua5cbab52020-02-16 19:09:26 +0100169 self.thread_wakeup = thread_wakeup
Thomas Moreau94459fd2018-01-05 11:15:54 +0100170 super().__init__(max_size, ctx=ctx)
171
172 def _on_queue_feeder_error(self, e, obj):
173 if isinstance(obj, _CallItem):
174 tb = traceback.format_exception(type(e), e, e.__traceback__)
175 e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
176 work_item = self.pending_work_items.pop(obj.work_id, None)
Thomas Moreaua5cbab52020-02-16 19:09:26 +0100177 self.thread_wakeup.wakeup()
Thomas Moreau0e890762020-03-01 21:49:14 +0100178 # work_item can be None if another process terminated. In this
179 # case, the executor_manager_thread fails all work_items
180 # with BrokenProcessPool
Thomas Moreau94459fd2018-01-05 11:15:54 +0100181 if work_item is not None:
182 work_item.future.set_exception(e)
183 else:
184 super()._on_queue_feeder_error(e, obj)
185
186
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200187def _get_chunks(*iterables, chunksize):
188 """ Iterates over zip()ed iterables in chunks. """
189 it = zip(*iterables)
190 while True:
191 chunk = tuple(itertools.islice(it, chunksize))
192 if not chunk:
193 return
194 yield chunk
195
Thomas Moreau0e890762020-03-01 21:49:14 +0100196
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200197def _process_chunk(fn, chunk):
198 """ Processes a chunk of an iterable passed to map.
199
200 Runs the function passed to map() on a chunk of the
201 iterable passed to map.
202
203 This function is run in a separate process.
204
205 """
206 return [fn(*args) for args in chunk]
207
Thomas Moreau94459fd2018-01-05 11:15:54 +0100208
209def _sendback_result(result_queue, work_id, result=None, exception=None):
210 """Safely send back the given result or exception"""
211 try:
212 result_queue.put(_ResultItem(work_id, result=result,
213 exception=exception))
214 except BaseException as e:
215 exc = _ExceptionWithTraceback(e, e.__traceback__)
216 result_queue.put(_ResultItem(work_id, exception=exc))
217
218
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100219def _process_worker(call_queue, result_queue, initializer, initargs):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000220 """Evaluates calls from call_queue and places the results in result_queue.
221
Georg Brandlfb1720b2010-12-09 18:08:43 +0000222 This worker is run in a separate process.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000223
224 Args:
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200225 call_queue: A ctx.Queue of _CallItems that will be read and
Brian Quinlan81c4d362010-09-18 22:35:02 +0000226 evaluated by the worker.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200227 result_queue: A ctx.Queue of _ResultItems that will written
Brian Quinlan81c4d362010-09-18 22:35:02 +0000228 to by the worker.
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100229 initializer: A callable initializer, or None
230 initargs: A tuple of args for the initializer
Brian Quinlan81c4d362010-09-18 22:35:02 +0000231 """
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100232 if initializer is not None:
233 try:
234 initializer(*initargs)
235 except BaseException:
236 _base.LOGGER.critical('Exception in initializer:', exc_info=True)
237 # The parent will notice that the process stopped and
238 # mark the pool broken
239 return
Brian Quinlan81c4d362010-09-18 22:35:02 +0000240 while True:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200241 call_item = call_queue.get(block=True)
242 if call_item is None:
243 # Wake up queue management thread
Antoine Pitroudd696492011-06-08 17:21:55 +0200244 result_queue.put(os.getpid())
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200245 return
Brian Quinlan81c4d362010-09-18 22:35:02 +0000246 try:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200247 r = call_item.fn(*call_item.args, **call_item.kwargs)
248 except BaseException as e:
Antoine Pitrou1285c9b2015-01-17 20:02:14 +0100249 exc = _ExceptionWithTraceback(e, e.__traceback__)
Thomas Moreau94459fd2018-01-05 11:15:54 +0100250 _sendback_result(result_queue, call_item.work_id, exception=exc)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000251 else:
Thomas Moreau94459fd2018-01-05 11:15:54 +0100252 _sendback_result(result_queue, call_item.work_id, result=r)
Dave Chevell962bdea2019-03-17 09:28:51 +1100253 del r
Brian Quinlan81c4d362010-09-18 22:35:02 +0000254
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200255 # Liberate the resource as soon as possible, to avoid holding onto
256 # open files or shared memory that is not needed anymore
257 del call_item
258
259
Thomas Moreau0e890762020-03-01 21:49:14 +0100260class _ExecutorManagerThread(threading.Thread):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000261 """Manages the communication between this process and the worker processes.
262
Thomas Moreau0e890762020-03-01 21:49:14 +0100263 The manager is run in a local thread.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000264
265 Args:
Thomas Moreau0e890762020-03-01 21:49:14 +0100266 executor: A reference to the ProcessPoolExecutor that owns
267 this thread. A weakref will be own by the manager as well as
268 references to internal objects used to introspect the state of
269 the executor.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000270 """
Antoine Pitrou020436b2011-07-02 21:20:25 +0200271
Thomas Moreau0e890762020-03-01 21:49:14 +0100272 def __init__(self, executor):
273 # Store references to necessary internals of the executor.
Antoine Pitroudd696492011-06-08 17:21:55 +0200274
Thomas Moreau0e890762020-03-01 21:49:14 +0100275 # A _ThreadWakeup to allow waking up the queue_manager_thread from the
276 # main Thread and avoid deadlocks caused by permanently locked queues.
277 self.thread_wakeup = executor._executor_manager_thread_wakeup
Thomas Moreau94459fd2018-01-05 11:15:54 +0100278
Thomas Moreau0e890762020-03-01 21:49:14 +0100279 # A weakref.ref to the ProcessPoolExecutor that owns this thread. Used
280 # to determine if the ProcessPoolExecutor has been garbage collected
281 # and that the manager can exit.
282 # When the executor gets garbage collected, the weakref callback
283 # will wake up the queue management thread so that it can terminate
284 # if there is no pending work item.
285 def weakref_cb(_, thread_wakeup=self.thread_wakeup):
286 mp.util.debug('Executor collected: triggering callback for'
287 ' QueueManager wakeup')
288 thread_wakeup.wakeup()
Antoine Pitroudd696492011-06-08 17:21:55 +0200289
Thomas Moreau0e890762020-03-01 21:49:14 +0100290 self.executor_reference = weakref.ref(executor, weakref_cb)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +0100291
Thomas Moreau0e890762020-03-01 21:49:14 +0100292 # A list of the ctx.Process instances used as workers.
293 self.processes = executor._processes
Brian Quinlan81c4d362010-09-18 22:35:02 +0000294
Thomas Moreau0e890762020-03-01 21:49:14 +0100295 # A ctx.Queue that will be filled with _CallItems derived from
296 # _WorkItems for processing by the process workers.
297 self.call_queue = executor._call_queue
298
299 # A ctx.SimpleQueue of _ResultItems generated by the process workers.
300 self.result_queue = executor._result_queue
301
302 # A queue.Queue of work ids e.g. Queue([5, 6, ...]).
303 self.work_ids_queue = executor._work_ids
304
305 # A dict mapping work ids to _WorkItems e.g.
306 # {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
307 self.pending_work_items = executor._pending_work_items
308
309 # Set this thread to be daemonized
310 super().__init__()
311 self.daemon = True
312
313 def run(self):
314 # Main loop for the executor manager thread.
315
316 while True:
317 self.add_call_item_to_queue()
318
319 result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
320
321 if is_broken:
322 self.terminate_broken(cause)
323 return
324 if result_item is not None:
325 self.process_result_item(result_item)
326 # Delete reference to result_item to avoid keeping references
327 # while waiting on new results.
328 del result_item
329
330 if self.is_shutting_down():
331 self.flag_executor_shutting_down()
332
333 # Since no new work items can be added, it is safe to shutdown
334 # this thread if there are no pending work items.
335 if not self.pending_work_items:
336 self.join_executor_internals()
337 return
338
339 def add_call_item_to_queue(self):
340 # Fills call_queue with _WorkItems from pending_work_items.
341 # This function never blocks.
342 while True:
343 if self.call_queue.full():
344 return
345 try:
346 work_id = self.work_ids_queue.get(block=False)
347 except queue.Empty:
348 return
349 else:
350 work_item = self.pending_work_items[work_id]
351
352 if work_item.future.set_running_or_notify_cancel():
353 self.call_queue.put(_CallItem(work_id,
354 work_item.fn,
355 work_item.args,
356 work_item.kwargs),
357 block=True)
358 else:
359 del self.pending_work_items[work_id]
360 continue
361
362 def wait_result_broken_or_wakeup(self):
Thomas Moreau94459fd2018-01-05 11:15:54 +0100363 # Wait for a result to be ready in the result_queue while checking
364 # that all worker processes are still running, or for a wake up
365 # signal send. The wake up signals come either from new tasks being
366 # submitted, from the executor being shutdown/gc-ed, or from the
367 # shutdown of the python interpreter.
Thomas Moreau0e890762020-03-01 21:49:14 +0100368 result_reader = self.result_queue._reader
369 wakeup_reader = self.thread_wakeup._reader
370 readers = [result_reader, wakeup_reader]
371 worker_sentinels = [p.sentinel for p in self.processes.values()]
Brian Quinlanf7bda5c2019-05-07 13:31:11 -0400372 ready = mp.connection.wait(readers + worker_sentinels)
Thomas Moreau94459fd2018-01-05 11:15:54 +0100373
374 cause = None
375 is_broken = True
Thomas Moreau0e890762020-03-01 21:49:14 +0100376 result_item = None
Thomas Moreau94459fd2018-01-05 11:15:54 +0100377 if result_reader in ready:
378 try:
379 result_item = result_reader.recv()
380 is_broken = False
381 except BaseException as e:
382 cause = traceback.format_exception(type(e), e, e.__traceback__)
383
384 elif wakeup_reader in ready:
385 is_broken = False
Thomas Moreau0e890762020-03-01 21:49:14 +0100386 self.thread_wakeup.clear()
387
388 return result_item, is_broken, cause
389
390 def process_result_item(self, result_item):
391 # Process the received a result_item. This can be either the PID of a
392 # worker that exited gracefully or a _ResultItem
393
Antoine Pitroudd696492011-06-08 17:21:55 +0200394 if isinstance(result_item, int):
395 # Clean shutdown of a worker using its PID
396 # (avoids marking the executor broken)
Thomas Moreau0e890762020-03-01 21:49:14 +0100397 assert self.is_shutting_down()
398 p = self.processes.pop(result_item)
Antoine Pitroud06a0652011-07-16 01:13:34 +0200399 p.join()
Thomas Moreau0e890762020-03-01 21:49:14 +0100400 if not self.processes:
401 self.join_executor_internals()
Antoine Pitrou020436b2011-07-02 21:20:25 +0200402 return
Thomas Moreau0e890762020-03-01 21:49:14 +0100403 else:
404 # Received a _ResultItem so mark the future as completed.
405 work_item = self.pending_work_items.pop(result_item.work_id, None)
Antoine Pitroudd696492011-06-08 17:21:55 +0200406 # work_item can be None if another process terminated (see above)
407 if work_item is not None:
408 if result_item.exception:
409 work_item.future.set_exception(result_item.exception)
410 else:
411 work_item.future.set_result(result_item.result)
Thomas Moreau94459fd2018-01-05 11:15:54 +0100412
Thomas Moreau0e890762020-03-01 21:49:14 +0100413 def is_shutting_down(self):
414 # Check whether we should start shutting down the executor.
415 executor = self.executor_reference()
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100416 # No more work items can be added if:
417 # - The interpreter is shutting down OR
418 # - The executor that owns this worker has been collected OR
419 # - The executor that owns this worker has been shutdown.
Thomas Moreau0e890762020-03-01 21:49:14 +0100420 return (_global_shutdown or executor is None
421 or executor._shutdown_thread)
Kyle Stanley339fd462020-02-02 07:49:00 -0500422
Thomas Moreau0e890762020-03-01 21:49:14 +0100423 def terminate_broken(self, cause):
424 # Terminate the executor because it is in a broken state. The cause
425 # argument can be used to display more information on the error that
426 # lead the executor into becoming broken.
Kyle Stanley339fd462020-02-02 07:49:00 -0500427
Thomas Moreau0e890762020-03-01 21:49:14 +0100428 # Mark the process pool broken so that submits fail right now.
429 executor = self.executor_reference()
430 if executor is not None:
431 executor._broken = ('A child process terminated '
432 'abruptly, the process pool is not '
433 'usable anymore')
434 executor._shutdown_thread = True
435 executor = None
436
437 # All pending tasks are to be marked failed with the following
438 # BrokenProcessPool error
439 bpe = BrokenProcessPool("A process in the process pool was "
440 "terminated abruptly while the future was "
441 "running or pending.")
442 if cause is not None:
443 bpe.__cause__ = _RemoteTraceback(
444 f"\n'''\n{''.join(cause)}'''")
445
446 # Mark pending tasks as failed.
447 for work_id, work_item in self.pending_work_items.items():
448 work_item.future.set_exception(bpe)
449 # Delete references to object. See issue16284
450 del work_item
451 self.pending_work_items.clear()
452
453 # Terminate remaining workers forcibly: the queues or their
454 # locks may be in a dirty state and block forever.
455 for p in self.processes.values():
456 p.terminate()
457
458 # clean up resources
459 self.join_executor_internals()
460
461 def flag_executor_shutting_down(self):
462 # Flag the executor as shutting down and cancel remaining tasks if
463 # requested as early as possible if it is not gc-ed yet.
464 executor = self.executor_reference()
465 if executor is not None:
466 executor._shutdown_thread = True
467 # Cancel pending work items if requested.
468 if executor._cancel_pending_futures:
469 # Cancel all pending futures and update pending_work_items
470 # to only have futures that are currently running.
471 new_pending_work_items = {}
472 for work_id, work_item in self.pending_work_items.items():
473 if not work_item.future.cancel():
474 new_pending_work_items[work_id] = work_item
475 self.pending_work_items = new_pending_work_items
476 # Drain work_ids_queue since we no longer need to
477 # add items to the call queue.
478 while True:
479 try:
480 self.work_ids_queue.get_nowait()
481 except queue.Empty:
482 break
483 # Make sure we do this only once to not waste time looping
484 # on running processes over and over.
485 executor._cancel_pending_futures = False
486
487 def shutdown_workers(self):
488 n_children_to_stop = self.get_n_children_alive()
489 n_sentinels_sent = 0
490 # Send the right number of sentinels, to make sure all children are
491 # properly terminated.
492 while (n_sentinels_sent < n_children_to_stop
493 and self.get_n_children_alive() > 0):
494 for i in range(n_children_to_stop - n_sentinels_sent):
495 try:
496 self.call_queue.put_nowait(None)
497 n_sentinels_sent += 1
498 except queue.Full:
499 break
500
501 def join_executor_internals(self):
502 self.shutdown_workers()
503 # Release the queue's resources as soon as possible.
504 self.call_queue.close()
505 self.call_queue.join_thread()
506 self.thread_wakeup.close()
507 # If .join() is not called on the created processes then
508 # some ctx.Queue methods may deadlock on Mac OS X.
509 for p in self.processes.values():
510 p.join()
511
512 def get_n_children_alive(self):
513 # This is an upper bound on the number of children alive.
514 return sum(p.is_alive() for p in self.processes.values())
Brian Quinlan81c4d362010-09-18 22:35:02 +0000515
Thomas Moreau94459fd2018-01-05 11:15:54 +0100516
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000517_system_limits_checked = False
518_system_limited = None
Thomas Moreau94459fd2018-01-05 11:15:54 +0100519
520
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000521def _check_system_limits():
522 global _system_limits_checked, _system_limited
523 if _system_limits_checked:
524 if _system_limited:
525 raise NotImplementedError(_system_limited)
526 _system_limits_checked = True
527 try:
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000528 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
529 except (AttributeError, ValueError):
530 # sysconf not available or setting not available
531 return
532 if nsems_max == -1:
Ezio Melottib5bc3532013-08-17 16:11:40 +0300533 # indetermined limit, assume that limit is determined
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000534 # by available memory only
535 return
536 if nsems_max >= 256:
537 # minimum number of semaphores available
538 # according to POSIX
539 return
Thomas Moreau94459fd2018-01-05 11:15:54 +0100540 _system_limited = ("system provides too few semaphores (%d"
541 " available, 256 necessary)" % nsems_max)
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000542 raise NotImplementedError(_system_limited)
543
Antoine Pitroudd696492011-06-08 17:21:55 +0200544
Grzegorz Grzywacz97e1b1c2017-09-01 18:54:00 +0200545def _chain_from_iterable_of_lists(iterable):
546 """
547 Specialized implementation of itertools.chain.from_iterable.
548 Each item in *iterable* should be a list. This function is
549 careful not to keep references to yielded objects.
550 """
551 for element in iterable:
552 element.reverse()
553 while element:
554 yield element.pop()
555
556
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100557class BrokenProcessPool(_base.BrokenExecutor):
Antoine Pitroudd696492011-06-08 17:21:55 +0200558 """
559 Raised when a process in a ProcessPoolExecutor terminated abruptly
560 while a future was in the running state.
561 """
562
563
Brian Quinlan81c4d362010-09-18 22:35:02 +0000564class ProcessPoolExecutor(_base.Executor):
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100565 def __init__(self, max_workers=None, mp_context=None,
566 initializer=None, initargs=()):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000567 """Initializes a new ProcessPoolExecutor instance.
568
569 Args:
570 max_workers: The maximum number of processes that can be used to
571 execute the given calls. If None or not given then as many
572 worker processes will be created as the machine has processors.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200573 mp_context: A multiprocessing context to launch the workers. This
574 object should provide SimpleQueue, Queue and Process.
ubordignon552ace72019-06-15 13:43:10 +0200575 initializer: A callable used to initialize worker processes.
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100576 initargs: A tuple of arguments to pass to the initializer.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000577 """
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000578 _check_system_limits()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000579
580 if max_workers is None:
Charles-François Natali37cfb0a2013-06-28 19:25:45 +0200581 self._max_workers = os.cpu_count() or 1
Brian Quinlan39889862019-05-08 14:04:53 -0400582 if sys.platform == 'win32':
583 self._max_workers = min(_MAX_WINDOWS_WORKERS,
584 self._max_workers)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000585 else:
Brian Quinlan20efceb2014-05-17 13:51:10 -0700586 if max_workers <= 0:
587 raise ValueError("max_workers must be greater than 0")
Brian Quinlan39889862019-05-08 14:04:53 -0400588 elif (sys.platform == 'win32' and
589 max_workers > _MAX_WINDOWS_WORKERS):
590 raise ValueError(
591 f"max_workers must be <= {_MAX_WINDOWS_WORKERS}")
Brian Quinlan20efceb2014-05-17 13:51:10 -0700592
Brian Quinlan81c4d362010-09-18 22:35:02 +0000593 self._max_workers = max_workers
Thomas Moreau94459fd2018-01-05 11:15:54 +0100594
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200595 if mp_context is None:
596 mp_context = mp.get_context()
597 self._mp_context = mp_context
Brian Quinlan81c4d362010-09-18 22:35:02 +0000598
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100599 if initializer is not None and not callable(initializer):
600 raise TypeError("initializer must be a callable")
601 self._initializer = initializer
602 self._initargs = initargs
603
Thomas Moreau94459fd2018-01-05 11:15:54 +0100604 # Management thread
Thomas Moreau0e890762020-03-01 21:49:14 +0100605 self._executor_manager_thread = None
Thomas Moreau94459fd2018-01-05 11:15:54 +0100606
Antoine Pitroudd696492011-06-08 17:21:55 +0200607 # Map of pids to processes
608 self._processes = {}
Brian Quinlan81c4d362010-09-18 22:35:02 +0000609
610 # Shutdown is a two-step process.
611 self._shutdown_thread = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000612 self._shutdown_lock = threading.Lock()
Antoine Pitroudd696492011-06-08 17:21:55 +0200613 self._broken = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000614 self._queue_count = 0
615 self._pending_work_items = {}
Kyle Stanley339fd462020-02-02 07:49:00 -0500616 self._cancel_pending_futures = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000617
Thomas Moreau94459fd2018-01-05 11:15:54 +0100618 # _ThreadWakeup is a communication channel used to interrupt the wait
Thomas Moreau0e890762020-03-01 21:49:14 +0100619 # of the main loop of executor_manager_thread from another thread (e.g.
Thomas Moreau94459fd2018-01-05 11:15:54 +0100620 # when calling executor.submit or executor.shutdown). We do not use the
Thomas Moreau0e890762020-03-01 21:49:14 +0100621 # _result_queue to send wakeup signals to the executor_manager_thread
Thomas Moreau94459fd2018-01-05 11:15:54 +0100622 # as it could result in a deadlock if a worker process dies with the
623 # _result_queue write lock still acquired.
Thomas Moreau0e890762020-03-01 21:49:14 +0100624 self._executor_manager_thread_wakeup = _ThreadWakeup()
Thomas Moreau94459fd2018-01-05 11:15:54 +0100625
Thomas Moreaua5cbab52020-02-16 19:09:26 +0100626 # Create communication channels for the executor
627 # Make the call queue slightly larger than the number of processes to
628 # prevent the worker processes from idling. But don't make it too big
629 # because futures in the call queue cannot be cancelled.
630 queue_size = self._max_workers + EXTRA_QUEUED_CALLS
631 self._call_queue = _SafeQueue(
632 max_size=queue_size, ctx=self._mp_context,
633 pending_work_items=self._pending_work_items,
Thomas Moreau0e890762020-03-01 21:49:14 +0100634 thread_wakeup=self._executor_manager_thread_wakeup)
Thomas Moreaua5cbab52020-02-16 19:09:26 +0100635 # Killed worker processes can produce spurious "broken pipe"
636 # tracebacks in the queue's own worker thread. But we detect killed
637 # processes anyway, so silence the tracebacks.
638 self._call_queue._ignore_epipe = True
639 self._result_queue = mp_context.SimpleQueue()
640 self._work_ids = queue.Queue()
641
Thomas Moreau0e890762020-03-01 21:49:14 +0100642 def _start_executor_manager_thread(self):
643 if self._executor_manager_thread is None:
Antoine Pitroudd696492011-06-08 17:21:55 +0200644 # Start the processes so that their sentinels are known.
645 self._adjust_process_count()
Thomas Moreau0e890762020-03-01 21:49:14 +0100646 self._executor_manager_thread = _ExecutorManagerThread(self)
647 self._executor_manager_thread.start()
648 _threads_wakeups[self._executor_manager_thread] = \
649 self._executor_manager_thread_wakeup
Brian Quinlan81c4d362010-09-18 22:35:02 +0000650
651 def _adjust_process_count(self):
652 for _ in range(len(self._processes), self._max_workers):
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200653 p = self._mp_context.Process(
654 target=_process_worker,
655 args=(self._call_queue,
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100656 self._result_queue,
657 self._initializer,
658 self._initargs))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000659 p.start()
Antoine Pitroudd696492011-06-08 17:21:55 +0200660 self._processes[p.pid] = p
Brian Quinlan81c4d362010-09-18 22:35:02 +0000661
Serhiy Storchaka142566c2019-06-05 18:22:31 +0300662 def submit(self, fn, /, *args, **kwargs):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000663 with self._shutdown_lock:
Antoine Pitroudd696492011-06-08 17:21:55 +0200664 if self._broken:
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100665 raise BrokenProcessPool(self._broken)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000666 if self._shutdown_thread:
667 raise RuntimeError('cannot schedule new futures after shutdown')
Mark Nemecc4b695f2018-04-10 18:23:14 +0100668 if _global_shutdown:
669 raise RuntimeError('cannot schedule new futures after '
670 'interpreter shutdown')
Brian Quinlan81c4d362010-09-18 22:35:02 +0000671
672 f = _base.Future()
673 w = _WorkItem(f, fn, args, kwargs)
674
675 self._pending_work_items[self._queue_count] = w
676 self._work_ids.put(self._queue_count)
677 self._queue_count += 1
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100678 # Wake up queue management thread
Thomas Moreau0e890762020-03-01 21:49:14 +0100679 self._executor_manager_thread_wakeup.wakeup()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000680
Thomas Moreau0e890762020-03-01 21:49:14 +0100681 self._start_executor_manager_thread()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000682 return f
683 submit.__doc__ = _base.Executor.submit.__doc__
684
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200685 def map(self, fn, *iterables, timeout=None, chunksize=1):
Martin Panterd2ad5712015-11-02 04:20:33 +0000686 """Returns an iterator equivalent to map(fn, iter).
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200687
688 Args:
689 fn: A callable that will take as many arguments as there are
690 passed iterables.
691 timeout: The maximum number of seconds to wait. If None, then there
692 is no limit on the wait time.
693 chunksize: If greater than one, the iterables will be chopped into
694 chunks of size chunksize and submitted to the process pool.
695 If set to one, the items in the list will be sent one at a time.
696
697 Returns:
698 An iterator equivalent to: map(func, *iterables) but the calls may
699 be evaluated out-of-order.
700
701 Raises:
702 TimeoutError: If the entire result iterator could not be generated
703 before the given timeout.
704 Exception: If fn(*args) raises for any values.
705 """
706 if chunksize < 1:
707 raise ValueError("chunksize must be >= 1.")
708
709 results = super().map(partial(_process_chunk, fn),
710 _get_chunks(*iterables, chunksize=chunksize),
711 timeout=timeout)
Grzegorz Grzywacz97e1b1c2017-09-01 18:54:00 +0200712 return _chain_from_iterable_of_lists(results)
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200713
Kyle Stanley339fd462020-02-02 07:49:00 -0500714 def shutdown(self, wait=True, *, cancel_futures=False):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000715 with self._shutdown_lock:
Kyle Stanley339fd462020-02-02 07:49:00 -0500716 self._cancel_pending_futures = cancel_futures
Brian Quinlan81c4d362010-09-18 22:35:02 +0000717 self._shutdown_thread = True
Kyle Stanley339fd462020-02-02 07:49:00 -0500718
Thomas Moreau0e890762020-03-01 21:49:14 +0100719 if self._executor_manager_thread:
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100720 # Wake up queue management thread
Thomas Moreau0e890762020-03-01 21:49:14 +0100721 self._executor_manager_thread_wakeup.wakeup()
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100722 if wait:
Thomas Moreau0e890762020-03-01 21:49:14 +0100723 self._executor_manager_thread.join()
Ezio Melottib5bc3532013-08-17 16:11:40 +0300724 # To reduce the risk of opening too many files, remove references to
Brian Quinlan81c4d362010-09-18 22:35:02 +0000725 # objects that use file descriptors.
Thomas Moreau0e890762020-03-01 21:49:14 +0100726 self._executor_manager_thread = None
Thomas Moreaua5cbab52020-02-16 19:09:26 +0100727 self._call_queue = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000728 self._result_queue = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000729 self._processes = None
Thomas Moreau095ee412018-03-12 18:18:41 +0100730
Thomas Moreau0e890762020-03-01 21:49:14 +0100731 if self._executor_manager_thread_wakeup:
732 self._executor_manager_thread_wakeup = None
Thomas Moreau095ee412018-03-12 18:18:41 +0100733
Brian Quinlan81c4d362010-09-18 22:35:02 +0000734 shutdown.__doc__ = _base.Executor.shutdown.__doc__
735
736atexit.register(_python_exit)