blob: a76e2c9cf231aec68786eee0f21cb7f75d99072f [file] [log] [blame]
Brian Quinlan81c4d362010-09-18 22:35:02 +00001# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4"""Implements ProcessPoolExecutor.
5
Thomas Graingerf938d8b2019-04-12 17:17:17 +01006The following diagram and text describe the data-flow through the system:
Brian Quinlan81c4d362010-09-18 22:35:02 +00007
8|======================= In-process =====================|== Out-of-process ==|
9
10+----------+ +----------+ +--------+ +-----------+ +---------+
Thomas Moreau94459fd2018-01-05 11:15:54 +010011| | => | Work Ids | | | | Call Q | | Process |
12| | +----------+ | | +-----------+ | Pool |
13| | | ... | | | | ... | +---------+
14| | | 6 | => | | => | 5, call() | => | |
Brian Quinlan81c4d362010-09-18 22:35:02 +000015| | | 7 | | | | ... | | |
16| Process | | ... | | Local | +-----------+ | Process |
17| Pool | +----------+ | Worker | | #1..n |
18| Executor | | Thread | | |
19| | +----------- + | | +-----------+ | |
20| | <=> | Work Items | <=> | | <= | Result Q | <= | |
21| | +------------+ | | +-----------+ | |
22| | | 6: call() | | | | ... | | |
23| | | future | | | | 4, result | | |
24| | | ... | | | | 3, except | | |
25+----------+ +------------+ +--------+ +-----------+ +---------+
26
27Executor.submit() called:
28- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29- adds the id of the _WorkItem to the "Work Ids" queue
30
31Local worker thread:
32- reads work ids from the "Work Ids" queue and looks up the corresponding
33 WorkItem from the "Work Items" dict: if the work item has been cancelled then
34 it is simply removed from the dict, otherwise it is repackaged as a
35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38- reads _ResultItems from "Result Q", updates the future stored in the
39 "Work Items" dict and deletes the dict entry
40
41Process #1..n:
42- reads _CallItems from "Call Q", executes the calls, and puts the resulting
Mark Dickinson5ee24042012-10-20 13:16:49 +010043 _ResultItems in "Result Q"
Brian Quinlan81c4d362010-09-18 22:35:02 +000044"""
45
46__author__ = 'Brian Quinlan (brian@sweetapp.com)'
47
48import atexit
Antoine Pitroudd696492011-06-08 17:21:55 +020049import os
Brian Quinlan81c4d362010-09-18 22:35:02 +000050from concurrent.futures import _base
51import queue
Thomas Moreaue8c368d2017-10-03 11:53:17 +020052import multiprocessing as mp
Brian Quinlanf7bda5c2019-05-07 13:31:11 -040053import multiprocessing.connection
Thomas Moreau94459fd2018-01-05 11:15:54 +010054from multiprocessing.queues import Queue
Brian Quinlan81c4d362010-09-18 22:35:02 +000055import threading
56import weakref
Antoine Pitrou4aae2762014-10-04 20:20:10 +020057from functools import partial
58import itertools
Brian Quinlan39889862019-05-08 14:04:53 -040059import sys
Antoine Pitrou1285c9b2015-01-17 20:02:14 +010060import traceback
Brian Quinlan81c4d362010-09-18 22:35:02 +000061
Brian Quinlan81c4d362010-09-18 22:35:02 +000062
Thomas Moreau94459fd2018-01-05 11:15:54 +010063_threads_wakeups = weakref.WeakKeyDictionary()
Thomas Moreaue8c368d2017-10-03 11:53:17 +020064_global_shutdown = False
Brian Quinlan81c4d362010-09-18 22:35:02 +000065
Thomas Moreau94459fd2018-01-05 11:15:54 +010066
67class _ThreadWakeup:
Thomas Moreau94459fd2018-01-05 11:15:54 +010068 def __init__(self):
Thomas Moreaua5cbab52020-02-16 19:09:26 +010069 self._closed = False
Thomas Moreau94459fd2018-01-05 11:15:54 +010070 self._reader, self._writer = mp.Pipe(duplex=False)
71
Thomas Moreau095ee412018-03-12 18:18:41 +010072 def close(self):
Thomas Moreaua5cbab52020-02-16 19:09:26 +010073 if not self._closed:
74 self._closed = True
75 self._writer.close()
76 self._reader.close()
Thomas Moreau095ee412018-03-12 18:18:41 +010077
Thomas Moreau94459fd2018-01-05 11:15:54 +010078 def wakeup(self):
Thomas Moreaua5cbab52020-02-16 19:09:26 +010079 if not self._closed:
80 self._writer.send_bytes(b"")
Thomas Moreau94459fd2018-01-05 11:15:54 +010081
82 def clear(self):
Thomas Moreaua5cbab52020-02-16 19:09:26 +010083 if not self._closed:
84 while self._reader.poll():
85 self._reader.recv_bytes()
Thomas Moreau94459fd2018-01-05 11:15:54 +010086
87
Brian Quinlan81c4d362010-09-18 22:35:02 +000088def _python_exit():
Thomas Moreaue8c368d2017-10-03 11:53:17 +020089 global _global_shutdown
90 _global_shutdown = True
Thomas Moreau94459fd2018-01-05 11:15:54 +010091 items = list(_threads_wakeups.items())
92 for _, thread_wakeup in items:
Victor Stinnera4dfe8e2020-04-29 03:32:06 +020093 # call not protected by ProcessPoolExecutor._shutdown_lock
Thomas Moreau94459fd2018-01-05 11:15:54 +010094 thread_wakeup.wakeup()
95 for t, _ in items:
Antoine Pitrouc13d4542011-03-26 19:29:44 +010096 t.join()
Brian Quinlan81c4d362010-09-18 22:35:02 +000097
Kyle Stanleyb61b8182020-03-27 15:31:22 -040098# Register for `_python_exit()` to be called just before joining all
99# non-daemon threads. This is used instead of `atexit.register()` for
100# compatibility with subinterpreters, which no longer support daemon threads.
101# See bpo-39812 for context.
102threading._register_atexit(_python_exit)
103
Brian Quinlan81c4d362010-09-18 22:35:02 +0000104# Controls how many more calls than processes will be queued in the call queue.
105# A smaller number will mean that processes spend more time idle waiting for
106# work while a larger number will make Future.cancel() succeed less frequently
107# (Futures in the call queue cannot be cancelled).
108EXTRA_QUEUED_CALLS = 1
109
Thomas Moreau94459fd2018-01-05 11:15:54 +0100110
Brian Quinlan39889862019-05-08 14:04:53 -0400111# On Windows, WaitForMultipleObjects is used to wait for processes to finish.
112# It can wait on, at most, 63 objects. There is an overhead of two objects:
113# - the result queue reader
114# - the thread wakeup reader
115_MAX_WINDOWS_WORKERS = 63 - 2
116
Antoine Pitrou1285c9b2015-01-17 20:02:14 +0100117# Hack to embed stringification of remote traceback in local traceback
118
119class _RemoteTraceback(Exception):
120 def __init__(self, tb):
121 self.tb = tb
122 def __str__(self):
123 return self.tb
124
125class _ExceptionWithTraceback:
126 def __init__(self, exc, tb):
127 tb = traceback.format_exception(type(exc), exc, tb)
128 tb = ''.join(tb)
129 self.exc = exc
130 self.tb = '\n"""\n%s"""' % tb
131 def __reduce__(self):
132 return _rebuild_exc, (self.exc, self.tb)
133
134def _rebuild_exc(exc, tb):
135 exc.__cause__ = _RemoteTraceback(tb)
136 return exc
137
Brian Quinlan81c4d362010-09-18 22:35:02 +0000138class _WorkItem(object):
139 def __init__(self, future, fn, args, kwargs):
140 self.future = future
141 self.fn = fn
142 self.args = args
143 self.kwargs = kwargs
144
145class _ResultItem(object):
146 def __init__(self, work_id, exception=None, result=None):
147 self.work_id = work_id
148 self.exception = exception
149 self.result = result
150
151class _CallItem(object):
152 def __init__(self, work_id, fn, args, kwargs):
153 self.work_id = work_id
154 self.fn = fn
155 self.args = args
156 self.kwargs = kwargs
157
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100158
Thomas Moreau94459fd2018-01-05 11:15:54 +0100159class _SafeQueue(Queue):
160 """Safe Queue set exception to the future object linked to a job"""
Victor Stinnera4dfe8e2020-04-29 03:32:06 +0200161 def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock,
162 thread_wakeup):
Thomas Moreau94459fd2018-01-05 11:15:54 +0100163 self.pending_work_items = pending_work_items
Victor Stinnera4dfe8e2020-04-29 03:32:06 +0200164 self.shutdown_lock = shutdown_lock
Thomas Moreaua5cbab52020-02-16 19:09:26 +0100165 self.thread_wakeup = thread_wakeup
Thomas Moreau94459fd2018-01-05 11:15:54 +0100166 super().__init__(max_size, ctx=ctx)
167
168 def _on_queue_feeder_error(self, e, obj):
169 if isinstance(obj, _CallItem):
170 tb = traceback.format_exception(type(e), e, e.__traceback__)
171 e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
172 work_item = self.pending_work_items.pop(obj.work_id, None)
Victor Stinnera4dfe8e2020-04-29 03:32:06 +0200173 with self.shutdown_lock:
174 self.thread_wakeup.wakeup()
Thomas Moreau0e890762020-03-01 21:49:14 +0100175 # work_item can be None if another process terminated. In this
176 # case, the executor_manager_thread fails all work_items
177 # with BrokenProcessPool
Thomas Moreau94459fd2018-01-05 11:15:54 +0100178 if work_item is not None:
179 work_item.future.set_exception(e)
180 else:
181 super()._on_queue_feeder_error(e, obj)
182
183
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200184def _get_chunks(*iterables, chunksize):
185 """ Iterates over zip()ed iterables in chunks. """
186 it = zip(*iterables)
187 while True:
188 chunk = tuple(itertools.islice(it, chunksize))
189 if not chunk:
190 return
191 yield chunk
192
Thomas Moreau0e890762020-03-01 21:49:14 +0100193
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200194def _process_chunk(fn, chunk):
195 """ Processes a chunk of an iterable passed to map.
196
197 Runs the function passed to map() on a chunk of the
198 iterable passed to map.
199
200 This function is run in a separate process.
201
202 """
203 return [fn(*args) for args in chunk]
204
Thomas Moreau94459fd2018-01-05 11:15:54 +0100205
206def _sendback_result(result_queue, work_id, result=None, exception=None):
207 """Safely send back the given result or exception"""
208 try:
209 result_queue.put(_ResultItem(work_id, result=result,
210 exception=exception))
211 except BaseException as e:
212 exc = _ExceptionWithTraceback(e, e.__traceback__)
213 result_queue.put(_ResultItem(work_id, exception=exc))
214
215
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100216def _process_worker(call_queue, result_queue, initializer, initargs):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000217 """Evaluates calls from call_queue and places the results in result_queue.
218
Georg Brandlfb1720b2010-12-09 18:08:43 +0000219 This worker is run in a separate process.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000220
221 Args:
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200222 call_queue: A ctx.Queue of _CallItems that will be read and
Brian Quinlan81c4d362010-09-18 22:35:02 +0000223 evaluated by the worker.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200224 result_queue: A ctx.Queue of _ResultItems that will written
Brian Quinlan81c4d362010-09-18 22:35:02 +0000225 to by the worker.
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100226 initializer: A callable initializer, or None
227 initargs: A tuple of args for the initializer
Brian Quinlan81c4d362010-09-18 22:35:02 +0000228 """
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100229 if initializer is not None:
230 try:
231 initializer(*initargs)
232 except BaseException:
233 _base.LOGGER.critical('Exception in initializer:', exc_info=True)
234 # The parent will notice that the process stopped and
235 # mark the pool broken
236 return
Brian Quinlan81c4d362010-09-18 22:35:02 +0000237 while True:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200238 call_item = call_queue.get(block=True)
239 if call_item is None:
240 # Wake up queue management thread
Antoine Pitroudd696492011-06-08 17:21:55 +0200241 result_queue.put(os.getpid())
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200242 return
Brian Quinlan81c4d362010-09-18 22:35:02 +0000243 try:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200244 r = call_item.fn(*call_item.args, **call_item.kwargs)
245 except BaseException as e:
Antoine Pitrou1285c9b2015-01-17 20:02:14 +0100246 exc = _ExceptionWithTraceback(e, e.__traceback__)
Thomas Moreau94459fd2018-01-05 11:15:54 +0100247 _sendback_result(result_queue, call_item.work_id, exception=exc)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000248 else:
Thomas Moreau94459fd2018-01-05 11:15:54 +0100249 _sendback_result(result_queue, call_item.work_id, result=r)
Dave Chevell962bdea2019-03-17 09:28:51 +1100250 del r
Brian Quinlan81c4d362010-09-18 22:35:02 +0000251
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200252 # Liberate the resource as soon as possible, to avoid holding onto
253 # open files or shared memory that is not needed anymore
254 del call_item
255
256
Thomas Moreau0e890762020-03-01 21:49:14 +0100257class _ExecutorManagerThread(threading.Thread):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000258 """Manages the communication between this process and the worker processes.
259
Thomas Moreau0e890762020-03-01 21:49:14 +0100260 The manager is run in a local thread.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000261
262 Args:
Thomas Moreau0e890762020-03-01 21:49:14 +0100263 executor: A reference to the ProcessPoolExecutor that owns
264 this thread. A weakref will be own by the manager as well as
265 references to internal objects used to introspect the state of
266 the executor.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000267 """
Antoine Pitrou020436b2011-07-02 21:20:25 +0200268
Thomas Moreau0e890762020-03-01 21:49:14 +0100269 def __init__(self, executor):
270 # Store references to necessary internals of the executor.
Antoine Pitroudd696492011-06-08 17:21:55 +0200271
Thomas Moreau0e890762020-03-01 21:49:14 +0100272 # A _ThreadWakeup to allow waking up the queue_manager_thread from the
273 # main Thread and avoid deadlocks caused by permanently locked queues.
274 self.thread_wakeup = executor._executor_manager_thread_wakeup
Victor Stinnera4dfe8e2020-04-29 03:32:06 +0200275 self.shutdown_lock = executor._shutdown_lock
Thomas Moreau94459fd2018-01-05 11:15:54 +0100276
Thomas Moreau0e890762020-03-01 21:49:14 +0100277 # A weakref.ref to the ProcessPoolExecutor that owns this thread. Used
278 # to determine if the ProcessPoolExecutor has been garbage collected
279 # and that the manager can exit.
280 # When the executor gets garbage collected, the weakref callback
281 # will wake up the queue management thread so that it can terminate
282 # if there is no pending work item.
Victor Stinnera4dfe8e2020-04-29 03:32:06 +0200283 def weakref_cb(_,
284 thread_wakeup=self.thread_wakeup,
285 shutdown_lock=self.shutdown_lock):
Thomas Moreau0e890762020-03-01 21:49:14 +0100286 mp.util.debug('Executor collected: triggering callback for'
287 ' QueueManager wakeup')
Victor Stinnera4dfe8e2020-04-29 03:32:06 +0200288 with shutdown_lock:
289 thread_wakeup.wakeup()
Antoine Pitroudd696492011-06-08 17:21:55 +0200290
Thomas Moreau0e890762020-03-01 21:49:14 +0100291 self.executor_reference = weakref.ref(executor, weakref_cb)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +0100292
Thomas Moreau0e890762020-03-01 21:49:14 +0100293 # A list of the ctx.Process instances used as workers.
294 self.processes = executor._processes
Brian Quinlan81c4d362010-09-18 22:35:02 +0000295
Thomas Moreau0e890762020-03-01 21:49:14 +0100296 # A ctx.Queue that will be filled with _CallItems derived from
297 # _WorkItems for processing by the process workers.
298 self.call_queue = executor._call_queue
299
300 # A ctx.SimpleQueue of _ResultItems generated by the process workers.
301 self.result_queue = executor._result_queue
302
303 # A queue.Queue of work ids e.g. Queue([5, 6, ...]).
304 self.work_ids_queue = executor._work_ids
305
306 # A dict mapping work ids to _WorkItems e.g.
307 # {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
308 self.pending_work_items = executor._pending_work_items
309
Thomas Moreau0e890762020-03-01 21:49:14 +0100310 super().__init__()
Thomas Moreau0e890762020-03-01 21:49:14 +0100311
312 def run(self):
313 # Main loop for the executor manager thread.
314
315 while True:
316 self.add_call_item_to_queue()
317
318 result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
319
320 if is_broken:
321 self.terminate_broken(cause)
322 return
323 if result_item is not None:
324 self.process_result_item(result_item)
325 # Delete reference to result_item to avoid keeping references
326 # while waiting on new results.
327 del result_item
328
Kyle Stanley1ac6e372020-04-19 10:00:59 -0400329 # attempt to increment idle process count
330 executor = self.executor_reference()
331 if executor is not None:
332 executor._idle_worker_semaphore.release()
333 del executor
334
Thomas Moreau0e890762020-03-01 21:49:14 +0100335 if self.is_shutting_down():
336 self.flag_executor_shutting_down()
337
338 # Since no new work items can be added, it is safe to shutdown
339 # this thread if there are no pending work items.
340 if not self.pending_work_items:
341 self.join_executor_internals()
342 return
343
344 def add_call_item_to_queue(self):
345 # Fills call_queue with _WorkItems from pending_work_items.
346 # This function never blocks.
347 while True:
348 if self.call_queue.full():
349 return
350 try:
351 work_id = self.work_ids_queue.get(block=False)
352 except queue.Empty:
353 return
354 else:
355 work_item = self.pending_work_items[work_id]
356
357 if work_item.future.set_running_or_notify_cancel():
358 self.call_queue.put(_CallItem(work_id,
359 work_item.fn,
360 work_item.args,
361 work_item.kwargs),
362 block=True)
363 else:
364 del self.pending_work_items[work_id]
365 continue
366
367 def wait_result_broken_or_wakeup(self):
Thomas Moreau94459fd2018-01-05 11:15:54 +0100368 # Wait for a result to be ready in the result_queue while checking
369 # that all worker processes are still running, or for a wake up
370 # signal send. The wake up signals come either from new tasks being
371 # submitted, from the executor being shutdown/gc-ed, or from the
372 # shutdown of the python interpreter.
Thomas Moreau0e890762020-03-01 21:49:14 +0100373 result_reader = self.result_queue._reader
Victor Stinnera4dfe8e2020-04-29 03:32:06 +0200374 assert not self.thread_wakeup._closed
Thomas Moreau0e890762020-03-01 21:49:14 +0100375 wakeup_reader = self.thread_wakeup._reader
376 readers = [result_reader, wakeup_reader]
377 worker_sentinels = [p.sentinel for p in self.processes.values()]
Brian Quinlanf7bda5c2019-05-07 13:31:11 -0400378 ready = mp.connection.wait(readers + worker_sentinels)
Thomas Moreau94459fd2018-01-05 11:15:54 +0100379
380 cause = None
381 is_broken = True
Thomas Moreau0e890762020-03-01 21:49:14 +0100382 result_item = None
Thomas Moreau94459fd2018-01-05 11:15:54 +0100383 if result_reader in ready:
384 try:
385 result_item = result_reader.recv()
386 is_broken = False
387 except BaseException as e:
388 cause = traceback.format_exception(type(e), e, e.__traceback__)
389
390 elif wakeup_reader in ready:
391 is_broken = False
Victor Stinnera4dfe8e2020-04-29 03:32:06 +0200392
393 with self.shutdown_lock:
394 self.thread_wakeup.clear()
Thomas Moreau0e890762020-03-01 21:49:14 +0100395
396 return result_item, is_broken, cause
397
398 def process_result_item(self, result_item):
399 # Process the received a result_item. This can be either the PID of a
400 # worker that exited gracefully or a _ResultItem
401
Antoine Pitroudd696492011-06-08 17:21:55 +0200402 if isinstance(result_item, int):
403 # Clean shutdown of a worker using its PID
404 # (avoids marking the executor broken)
Thomas Moreau0e890762020-03-01 21:49:14 +0100405 assert self.is_shutting_down()
406 p = self.processes.pop(result_item)
Antoine Pitroud06a0652011-07-16 01:13:34 +0200407 p.join()
Thomas Moreau0e890762020-03-01 21:49:14 +0100408 if not self.processes:
409 self.join_executor_internals()
Antoine Pitrou020436b2011-07-02 21:20:25 +0200410 return
Thomas Moreau0e890762020-03-01 21:49:14 +0100411 else:
412 # Received a _ResultItem so mark the future as completed.
413 work_item = self.pending_work_items.pop(result_item.work_id, None)
Antoine Pitroudd696492011-06-08 17:21:55 +0200414 # work_item can be None if another process terminated (see above)
415 if work_item is not None:
416 if result_item.exception:
417 work_item.future.set_exception(result_item.exception)
418 else:
419 work_item.future.set_result(result_item.result)
Thomas Moreau94459fd2018-01-05 11:15:54 +0100420
Thomas Moreau0e890762020-03-01 21:49:14 +0100421 def is_shutting_down(self):
422 # Check whether we should start shutting down the executor.
423 executor = self.executor_reference()
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100424 # No more work items can be added if:
425 # - The interpreter is shutting down OR
426 # - The executor that owns this worker has been collected OR
427 # - The executor that owns this worker has been shutdown.
Thomas Moreau0e890762020-03-01 21:49:14 +0100428 return (_global_shutdown or executor is None
429 or executor._shutdown_thread)
Kyle Stanley339fd462020-02-02 07:49:00 -0500430
Thomas Moreau0e890762020-03-01 21:49:14 +0100431 def terminate_broken(self, cause):
432 # Terminate the executor because it is in a broken state. The cause
433 # argument can be used to display more information on the error that
434 # lead the executor into becoming broken.
Kyle Stanley339fd462020-02-02 07:49:00 -0500435
Thomas Moreau0e890762020-03-01 21:49:14 +0100436 # Mark the process pool broken so that submits fail right now.
437 executor = self.executor_reference()
438 if executor is not None:
439 executor._broken = ('A child process terminated '
440 'abruptly, the process pool is not '
441 'usable anymore')
442 executor._shutdown_thread = True
443 executor = None
444
445 # All pending tasks are to be marked failed with the following
446 # BrokenProcessPool error
447 bpe = BrokenProcessPool("A process in the process pool was "
448 "terminated abruptly while the future was "
449 "running or pending.")
450 if cause is not None:
451 bpe.__cause__ = _RemoteTraceback(
452 f"\n'''\n{''.join(cause)}'''")
453
454 # Mark pending tasks as failed.
455 for work_id, work_item in self.pending_work_items.items():
456 work_item.future.set_exception(bpe)
457 # Delete references to object. See issue16284
458 del work_item
459 self.pending_work_items.clear()
460
461 # Terminate remaining workers forcibly: the queues or their
462 # locks may be in a dirty state and block forever.
463 for p in self.processes.values():
464 p.terminate()
465
466 # clean up resources
467 self.join_executor_internals()
468
469 def flag_executor_shutting_down(self):
470 # Flag the executor as shutting down and cancel remaining tasks if
471 # requested as early as possible if it is not gc-ed yet.
472 executor = self.executor_reference()
473 if executor is not None:
474 executor._shutdown_thread = True
475 # Cancel pending work items if requested.
476 if executor._cancel_pending_futures:
477 # Cancel all pending futures and update pending_work_items
478 # to only have futures that are currently running.
479 new_pending_work_items = {}
480 for work_id, work_item in self.pending_work_items.items():
481 if not work_item.future.cancel():
482 new_pending_work_items[work_id] = work_item
483 self.pending_work_items = new_pending_work_items
484 # Drain work_ids_queue since we no longer need to
485 # add items to the call queue.
486 while True:
487 try:
488 self.work_ids_queue.get_nowait()
489 except queue.Empty:
490 break
491 # Make sure we do this only once to not waste time looping
492 # on running processes over and over.
493 executor._cancel_pending_futures = False
494
495 def shutdown_workers(self):
496 n_children_to_stop = self.get_n_children_alive()
497 n_sentinels_sent = 0
498 # Send the right number of sentinels, to make sure all children are
499 # properly terminated.
500 while (n_sentinels_sent < n_children_to_stop
501 and self.get_n_children_alive() > 0):
502 for i in range(n_children_to_stop - n_sentinels_sent):
503 try:
504 self.call_queue.put_nowait(None)
505 n_sentinels_sent += 1
506 except queue.Full:
507 break
508
509 def join_executor_internals(self):
510 self.shutdown_workers()
511 # Release the queue's resources as soon as possible.
512 self.call_queue.close()
513 self.call_queue.join_thread()
Victor Stinnera4dfe8e2020-04-29 03:32:06 +0200514 with self.shutdown_lock:
515 self.thread_wakeup.close()
Thomas Moreau0e890762020-03-01 21:49:14 +0100516 # If .join() is not called on the created processes then
517 # some ctx.Queue methods may deadlock on Mac OS X.
518 for p in self.processes.values():
519 p.join()
520
521 def get_n_children_alive(self):
522 # This is an upper bound on the number of children alive.
523 return sum(p.is_alive() for p in self.processes.values())
Brian Quinlan81c4d362010-09-18 22:35:02 +0000524
Thomas Moreau94459fd2018-01-05 11:15:54 +0100525
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000526_system_limits_checked = False
527_system_limited = None
Thomas Moreau94459fd2018-01-05 11:15:54 +0100528
529
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000530def _check_system_limits():
531 global _system_limits_checked, _system_limited
532 if _system_limits_checked:
533 if _system_limited:
534 raise NotImplementedError(_system_limited)
535 _system_limits_checked = True
536 try:
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000537 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
538 except (AttributeError, ValueError):
539 # sysconf not available or setting not available
540 return
541 if nsems_max == -1:
Ezio Melottib5bc3532013-08-17 16:11:40 +0300542 # indetermined limit, assume that limit is determined
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000543 # by available memory only
544 return
545 if nsems_max >= 256:
546 # minimum number of semaphores available
547 # according to POSIX
548 return
Thomas Moreau94459fd2018-01-05 11:15:54 +0100549 _system_limited = ("system provides too few semaphores (%d"
550 " available, 256 necessary)" % nsems_max)
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000551 raise NotImplementedError(_system_limited)
552
Antoine Pitroudd696492011-06-08 17:21:55 +0200553
Grzegorz Grzywacz97e1b1c2017-09-01 18:54:00 +0200554def _chain_from_iterable_of_lists(iterable):
555 """
556 Specialized implementation of itertools.chain.from_iterable.
557 Each item in *iterable* should be a list. This function is
558 careful not to keep references to yielded objects.
559 """
560 for element in iterable:
561 element.reverse()
562 while element:
563 yield element.pop()
564
565
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100566class BrokenProcessPool(_base.BrokenExecutor):
Antoine Pitroudd696492011-06-08 17:21:55 +0200567 """
568 Raised when a process in a ProcessPoolExecutor terminated abruptly
569 while a future was in the running state.
570 """
571
572
Brian Quinlan81c4d362010-09-18 22:35:02 +0000573class ProcessPoolExecutor(_base.Executor):
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100574 def __init__(self, max_workers=None, mp_context=None,
575 initializer=None, initargs=()):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000576 """Initializes a new ProcessPoolExecutor instance.
577
578 Args:
579 max_workers: The maximum number of processes that can be used to
580 execute the given calls. If None or not given then as many
581 worker processes will be created as the machine has processors.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200582 mp_context: A multiprocessing context to launch the workers. This
583 object should provide SimpleQueue, Queue and Process.
ubordignon552ace72019-06-15 13:43:10 +0200584 initializer: A callable used to initialize worker processes.
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100585 initargs: A tuple of arguments to pass to the initializer.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000586 """
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000587 _check_system_limits()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000588
589 if max_workers is None:
Charles-François Natali37cfb0a2013-06-28 19:25:45 +0200590 self._max_workers = os.cpu_count() or 1
Brian Quinlan39889862019-05-08 14:04:53 -0400591 if sys.platform == 'win32':
592 self._max_workers = min(_MAX_WINDOWS_WORKERS,
593 self._max_workers)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000594 else:
Brian Quinlan20efceb2014-05-17 13:51:10 -0700595 if max_workers <= 0:
596 raise ValueError("max_workers must be greater than 0")
Brian Quinlan39889862019-05-08 14:04:53 -0400597 elif (sys.platform == 'win32' and
598 max_workers > _MAX_WINDOWS_WORKERS):
599 raise ValueError(
600 f"max_workers must be <= {_MAX_WINDOWS_WORKERS}")
Brian Quinlan20efceb2014-05-17 13:51:10 -0700601
Brian Quinlan81c4d362010-09-18 22:35:02 +0000602 self._max_workers = max_workers
Thomas Moreau94459fd2018-01-05 11:15:54 +0100603
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200604 if mp_context is None:
605 mp_context = mp.get_context()
606 self._mp_context = mp_context
Brian Quinlan81c4d362010-09-18 22:35:02 +0000607
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100608 if initializer is not None and not callable(initializer):
609 raise TypeError("initializer must be a callable")
610 self._initializer = initializer
611 self._initargs = initargs
612
Thomas Moreau94459fd2018-01-05 11:15:54 +0100613 # Management thread
Thomas Moreau0e890762020-03-01 21:49:14 +0100614 self._executor_manager_thread = None
Thomas Moreau94459fd2018-01-05 11:15:54 +0100615
Antoine Pitroudd696492011-06-08 17:21:55 +0200616 # Map of pids to processes
617 self._processes = {}
Brian Quinlan81c4d362010-09-18 22:35:02 +0000618
619 # Shutdown is a two-step process.
620 self._shutdown_thread = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000621 self._shutdown_lock = threading.Lock()
Kyle Stanley1ac6e372020-04-19 10:00:59 -0400622 self._idle_worker_semaphore = threading.Semaphore(0)
Antoine Pitroudd696492011-06-08 17:21:55 +0200623 self._broken = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000624 self._queue_count = 0
625 self._pending_work_items = {}
Kyle Stanley339fd462020-02-02 07:49:00 -0500626 self._cancel_pending_futures = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000627
Thomas Moreau94459fd2018-01-05 11:15:54 +0100628 # _ThreadWakeup is a communication channel used to interrupt the wait
Thomas Moreau0e890762020-03-01 21:49:14 +0100629 # of the main loop of executor_manager_thread from another thread (e.g.
Thomas Moreau94459fd2018-01-05 11:15:54 +0100630 # when calling executor.submit or executor.shutdown). We do not use the
Thomas Moreau0e890762020-03-01 21:49:14 +0100631 # _result_queue to send wakeup signals to the executor_manager_thread
Thomas Moreau94459fd2018-01-05 11:15:54 +0100632 # as it could result in a deadlock if a worker process dies with the
633 # _result_queue write lock still acquired.
Victor Stinnera4dfe8e2020-04-29 03:32:06 +0200634 #
635 # _shutdown_lock must be locked to access _ThreadWakeup.
Thomas Moreau0e890762020-03-01 21:49:14 +0100636 self._executor_manager_thread_wakeup = _ThreadWakeup()
Thomas Moreau94459fd2018-01-05 11:15:54 +0100637
Thomas Moreaua5cbab52020-02-16 19:09:26 +0100638 # Create communication channels for the executor
639 # Make the call queue slightly larger than the number of processes to
640 # prevent the worker processes from idling. But don't make it too big
641 # because futures in the call queue cannot be cancelled.
642 queue_size = self._max_workers + EXTRA_QUEUED_CALLS
643 self._call_queue = _SafeQueue(
644 max_size=queue_size, ctx=self._mp_context,
645 pending_work_items=self._pending_work_items,
Victor Stinnera4dfe8e2020-04-29 03:32:06 +0200646 shutdown_lock=self._shutdown_lock,
Thomas Moreau0e890762020-03-01 21:49:14 +0100647 thread_wakeup=self._executor_manager_thread_wakeup)
Thomas Moreaua5cbab52020-02-16 19:09:26 +0100648 # Killed worker processes can produce spurious "broken pipe"
649 # tracebacks in the queue's own worker thread. But we detect killed
650 # processes anyway, so silence the tracebacks.
651 self._call_queue._ignore_epipe = True
652 self._result_queue = mp_context.SimpleQueue()
653 self._work_ids = queue.Queue()
654
Thomas Moreau0e890762020-03-01 21:49:14 +0100655 def _start_executor_manager_thread(self):
656 if self._executor_manager_thread is None:
Antoine Pitroudd696492011-06-08 17:21:55 +0200657 # Start the processes so that their sentinels are known.
Thomas Moreau0e890762020-03-01 21:49:14 +0100658 self._executor_manager_thread = _ExecutorManagerThread(self)
659 self._executor_manager_thread.start()
660 _threads_wakeups[self._executor_manager_thread] = \
661 self._executor_manager_thread_wakeup
Brian Quinlan81c4d362010-09-18 22:35:02 +0000662
663 def _adjust_process_count(self):
Kyle Stanley1ac6e372020-04-19 10:00:59 -0400664 # if there's an idle process, we don't need to spawn a new one.
665 if self._idle_worker_semaphore.acquire(blocking=False):
666 return
667
668 process_count = len(self._processes)
669 if process_count < self._max_workers:
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200670 p = self._mp_context.Process(
671 target=_process_worker,
672 args=(self._call_queue,
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100673 self._result_queue,
674 self._initializer,
675 self._initargs))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000676 p.start()
Antoine Pitroudd696492011-06-08 17:21:55 +0200677 self._processes[p.pid] = p
Brian Quinlan81c4d362010-09-18 22:35:02 +0000678
Serhiy Storchaka142566c2019-06-05 18:22:31 +0300679 def submit(self, fn, /, *args, **kwargs):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000680 with self._shutdown_lock:
Antoine Pitroudd696492011-06-08 17:21:55 +0200681 if self._broken:
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100682 raise BrokenProcessPool(self._broken)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000683 if self._shutdown_thread:
684 raise RuntimeError('cannot schedule new futures after shutdown')
Mark Nemecc4b695f2018-04-10 18:23:14 +0100685 if _global_shutdown:
686 raise RuntimeError('cannot schedule new futures after '
687 'interpreter shutdown')
Brian Quinlan81c4d362010-09-18 22:35:02 +0000688
689 f = _base.Future()
690 w = _WorkItem(f, fn, args, kwargs)
691
692 self._pending_work_items[self._queue_count] = w
693 self._work_ids.put(self._queue_count)
694 self._queue_count += 1
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100695 # Wake up queue management thread
Thomas Moreau0e890762020-03-01 21:49:14 +0100696 self._executor_manager_thread_wakeup.wakeup()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000697
Kyle Stanley1ac6e372020-04-19 10:00:59 -0400698 self._adjust_process_count()
Thomas Moreau0e890762020-03-01 21:49:14 +0100699 self._start_executor_manager_thread()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000700 return f
701 submit.__doc__ = _base.Executor.submit.__doc__
702
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200703 def map(self, fn, *iterables, timeout=None, chunksize=1):
Martin Panterd2ad5712015-11-02 04:20:33 +0000704 """Returns an iterator equivalent to map(fn, iter).
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200705
706 Args:
707 fn: A callable that will take as many arguments as there are
708 passed iterables.
709 timeout: The maximum number of seconds to wait. If None, then there
710 is no limit on the wait time.
711 chunksize: If greater than one, the iterables will be chopped into
712 chunks of size chunksize and submitted to the process pool.
713 If set to one, the items in the list will be sent one at a time.
714
715 Returns:
716 An iterator equivalent to: map(func, *iterables) but the calls may
717 be evaluated out-of-order.
718
719 Raises:
720 TimeoutError: If the entire result iterator could not be generated
721 before the given timeout.
722 Exception: If fn(*args) raises for any values.
723 """
724 if chunksize < 1:
725 raise ValueError("chunksize must be >= 1.")
726
727 results = super().map(partial(_process_chunk, fn),
728 _get_chunks(*iterables, chunksize=chunksize),
729 timeout=timeout)
Grzegorz Grzywacz97e1b1c2017-09-01 18:54:00 +0200730 return _chain_from_iterable_of_lists(results)
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200731
Kyle Stanley339fd462020-02-02 07:49:00 -0500732 def shutdown(self, wait=True, *, cancel_futures=False):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000733 with self._shutdown_lock:
Kyle Stanley339fd462020-02-02 07:49:00 -0500734 self._cancel_pending_futures = cancel_futures
Brian Quinlan81c4d362010-09-18 22:35:02 +0000735 self._shutdown_thread = True
Victor Stinnera4dfe8e2020-04-29 03:32:06 +0200736 if self._executor_manager_thread_wakeup is not None:
737 # Wake up queue management thread
738 self._executor_manager_thread_wakeup.wakeup()
Kyle Stanley339fd462020-02-02 07:49:00 -0500739
Victor Stinnera4dfe8e2020-04-29 03:32:06 +0200740 if self._executor_manager_thread is not None and wait:
741 self._executor_manager_thread.join()
Ezio Melottib5bc3532013-08-17 16:11:40 +0300742 # To reduce the risk of opening too many files, remove references to
Brian Quinlan81c4d362010-09-18 22:35:02 +0000743 # objects that use file descriptors.
Thomas Moreau0e890762020-03-01 21:49:14 +0100744 self._executor_manager_thread = None
Thomas Moreaua5cbab52020-02-16 19:09:26 +0100745 self._call_queue = None
Victor Stinner1a275012020-04-27 20:53:37 +0200746 if self._result_queue is not None and wait:
747 self._result_queue.close()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000748 self._result_queue = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000749 self._processes = None
Victor Stinnera4dfe8e2020-04-29 03:32:06 +0200750 self._executor_manager_thread_wakeup = None
Thomas Moreau095ee412018-03-12 18:18:41 +0100751
Brian Quinlan81c4d362010-09-18 22:35:02 +0000752 shutdown.__doc__ = _base.Executor.shutdown.__doc__