blob: 90bc98bf2ecd178eb2eb1801804cc1cb9bf8fd0c [file] [log] [blame]
Brian Quinlan81c4d362010-09-18 22:35:02 +00001# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4"""Implements ProcessPoolExecutor.
5
Thomas Graingerf938d8b2019-04-12 17:17:17 +01006The following diagram and text describe the data-flow through the system:
Brian Quinlan81c4d362010-09-18 22:35:02 +00007
8|======================= In-process =====================|== Out-of-process ==|
9
10+----------+ +----------+ +--------+ +-----------+ +---------+
Thomas Moreau94459fd2018-01-05 11:15:54 +010011| | => | Work Ids | | | | Call Q | | Process |
12| | +----------+ | | +-----------+ | Pool |
13| | | ... | | | | ... | +---------+
14| | | 6 | => | | => | 5, call() | => | |
Brian Quinlan81c4d362010-09-18 22:35:02 +000015| | | 7 | | | | ... | | |
16| Process | | ... | | Local | +-----------+ | Process |
17| Pool | +----------+ | Worker | | #1..n |
18| Executor | | Thread | | |
19| | +----------- + | | +-----------+ | |
20| | <=> | Work Items | <=> | | <= | Result Q | <= | |
21| | +------------+ | | +-----------+ | |
22| | | 6: call() | | | | ... | | |
23| | | future | | | | 4, result | | |
24| | | ... | | | | 3, except | | |
25+----------+ +------------+ +--------+ +-----------+ +---------+
26
27Executor.submit() called:
28- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29- adds the id of the _WorkItem to the "Work Ids" queue
30
31Local worker thread:
32- reads work ids from the "Work Ids" queue and looks up the corresponding
33 WorkItem from the "Work Items" dict: if the work item has been cancelled then
34 it is simply removed from the dict, otherwise it is repackaged as a
35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38- reads _ResultItems from "Result Q", updates the future stored in the
39 "Work Items" dict and deletes the dict entry
40
41Process #1..n:
42- reads _CallItems from "Call Q", executes the calls, and puts the resulting
Mark Dickinson5ee24042012-10-20 13:16:49 +010043 _ResultItems in "Result Q"
Brian Quinlan81c4d362010-09-18 22:35:02 +000044"""
45
46__author__ = 'Brian Quinlan (brian@sweetapp.com)'
47
Antoine Pitroudd696492011-06-08 17:21:55 +020048import os
Brian Quinlan81c4d362010-09-18 22:35:02 +000049from concurrent.futures import _base
50import queue
Thomas Moreaue8c368d2017-10-03 11:53:17 +020051import multiprocessing as mp
Brian Quinlanf7bda5c2019-05-07 13:31:11 -040052import multiprocessing.connection
Thomas Moreau94459fd2018-01-05 11:15:54 +010053from multiprocessing.queues import Queue
Brian Quinlan81c4d362010-09-18 22:35:02 +000054import threading
55import weakref
Antoine Pitrou4aae2762014-10-04 20:20:10 +020056from functools import partial
57import itertools
Brian Quinlan39889862019-05-08 14:04:53 -040058import sys
Antoine Pitrou1285c9b2015-01-17 20:02:14 +010059import traceback
Brian Quinlan81c4d362010-09-18 22:35:02 +000060
Brian Quinlan81c4d362010-09-18 22:35:02 +000061
Thomas Moreau94459fd2018-01-05 11:15:54 +010062_threads_wakeups = weakref.WeakKeyDictionary()
Thomas Moreaue8c368d2017-10-03 11:53:17 +020063_global_shutdown = False
Brian Quinlan81c4d362010-09-18 22:35:02 +000064
Thomas Moreau94459fd2018-01-05 11:15:54 +010065
66class _ThreadWakeup:
Thomas Moreau94459fd2018-01-05 11:15:54 +010067 def __init__(self):
Thomas Moreaua5cbab52020-02-16 19:09:26 +010068 self._closed = False
Thomas Moreau94459fd2018-01-05 11:15:54 +010069 self._reader, self._writer = mp.Pipe(duplex=False)
70
Thomas Moreau095ee412018-03-12 18:18:41 +010071 def close(self):
Thomas Moreaua5cbab52020-02-16 19:09:26 +010072 if not self._closed:
73 self._closed = True
74 self._writer.close()
75 self._reader.close()
Thomas Moreau095ee412018-03-12 18:18:41 +010076
Thomas Moreau94459fd2018-01-05 11:15:54 +010077 def wakeup(self):
Thomas Moreaua5cbab52020-02-16 19:09:26 +010078 if not self._closed:
79 self._writer.send_bytes(b"")
Thomas Moreau94459fd2018-01-05 11:15:54 +010080
81 def clear(self):
Thomas Moreaua5cbab52020-02-16 19:09:26 +010082 if not self._closed:
83 while self._reader.poll():
84 self._reader.recv_bytes()
Thomas Moreau94459fd2018-01-05 11:15:54 +010085
86
Brian Quinlan81c4d362010-09-18 22:35:02 +000087def _python_exit():
Thomas Moreaue8c368d2017-10-03 11:53:17 +020088 global _global_shutdown
89 _global_shutdown = True
Thomas Moreau94459fd2018-01-05 11:15:54 +010090 items = list(_threads_wakeups.items())
91 for _, thread_wakeup in items:
Victor Stinnera4dfe8e2020-04-29 03:32:06 +020092 # call not protected by ProcessPoolExecutor._shutdown_lock
Thomas Moreau94459fd2018-01-05 11:15:54 +010093 thread_wakeup.wakeup()
94 for t, _ in items:
Antoine Pitrouc13d4542011-03-26 19:29:44 +010095 t.join()
Brian Quinlan81c4d362010-09-18 22:35:02 +000096
Kyle Stanleyb61b8182020-03-27 15:31:22 -040097# Register for `_python_exit()` to be called just before joining all
98# non-daemon threads. This is used instead of `atexit.register()` for
99# compatibility with subinterpreters, which no longer support daemon threads.
100# See bpo-39812 for context.
101threading._register_atexit(_python_exit)
102
Brian Quinlan81c4d362010-09-18 22:35:02 +0000103# Controls how many more calls than processes will be queued in the call queue.
104# A smaller number will mean that processes spend more time idle waiting for
105# work while a larger number will make Future.cancel() succeed less frequently
106# (Futures in the call queue cannot be cancelled).
107EXTRA_QUEUED_CALLS = 1
108
Thomas Moreau94459fd2018-01-05 11:15:54 +0100109
Brian Quinlan39889862019-05-08 14:04:53 -0400110# On Windows, WaitForMultipleObjects is used to wait for processes to finish.
111# It can wait on, at most, 63 objects. There is an overhead of two objects:
112# - the result queue reader
113# - the thread wakeup reader
114_MAX_WINDOWS_WORKERS = 63 - 2
115
Antoine Pitrou1285c9b2015-01-17 20:02:14 +0100116# Hack to embed stringification of remote traceback in local traceback
117
118class _RemoteTraceback(Exception):
119 def __init__(self, tb):
120 self.tb = tb
121 def __str__(self):
122 return self.tb
123
124class _ExceptionWithTraceback:
125 def __init__(self, exc, tb):
126 tb = traceback.format_exception(type(exc), exc, tb)
127 tb = ''.join(tb)
128 self.exc = exc
129 self.tb = '\n"""\n%s"""' % tb
130 def __reduce__(self):
131 return _rebuild_exc, (self.exc, self.tb)
132
133def _rebuild_exc(exc, tb):
134 exc.__cause__ = _RemoteTraceback(tb)
135 return exc
136
Brian Quinlan81c4d362010-09-18 22:35:02 +0000137class _WorkItem(object):
138 def __init__(self, future, fn, args, kwargs):
139 self.future = future
140 self.fn = fn
141 self.args = args
142 self.kwargs = kwargs
143
144class _ResultItem(object):
145 def __init__(self, work_id, exception=None, result=None):
146 self.work_id = work_id
147 self.exception = exception
148 self.result = result
149
150class _CallItem(object):
151 def __init__(self, work_id, fn, args, kwargs):
152 self.work_id = work_id
153 self.fn = fn
154 self.args = args
155 self.kwargs = kwargs
156
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100157
Thomas Moreau94459fd2018-01-05 11:15:54 +0100158class _SafeQueue(Queue):
159 """Safe Queue set exception to the future object linked to a job"""
Victor Stinnera4dfe8e2020-04-29 03:32:06 +0200160 def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock,
161 thread_wakeup):
Thomas Moreau94459fd2018-01-05 11:15:54 +0100162 self.pending_work_items = pending_work_items
Victor Stinnera4dfe8e2020-04-29 03:32:06 +0200163 self.shutdown_lock = shutdown_lock
Thomas Moreaua5cbab52020-02-16 19:09:26 +0100164 self.thread_wakeup = thread_wakeup
Thomas Moreau94459fd2018-01-05 11:15:54 +0100165 super().__init__(max_size, ctx=ctx)
166
167 def _on_queue_feeder_error(self, e, obj):
168 if isinstance(obj, _CallItem):
169 tb = traceback.format_exception(type(e), e, e.__traceback__)
170 e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
171 work_item = self.pending_work_items.pop(obj.work_id, None)
Victor Stinnera4dfe8e2020-04-29 03:32:06 +0200172 with self.shutdown_lock:
173 self.thread_wakeup.wakeup()
Thomas Moreau0e890762020-03-01 21:49:14 +0100174 # work_item can be None if another process terminated. In this
175 # case, the executor_manager_thread fails all work_items
176 # with BrokenProcessPool
Thomas Moreau94459fd2018-01-05 11:15:54 +0100177 if work_item is not None:
178 work_item.future.set_exception(e)
179 else:
180 super()._on_queue_feeder_error(e, obj)
181
182
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200183def _get_chunks(*iterables, chunksize):
184 """ Iterates over zip()ed iterables in chunks. """
185 it = zip(*iterables)
186 while True:
187 chunk = tuple(itertools.islice(it, chunksize))
188 if not chunk:
189 return
190 yield chunk
191
Thomas Moreau0e890762020-03-01 21:49:14 +0100192
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200193def _process_chunk(fn, chunk):
194 """ Processes a chunk of an iterable passed to map.
195
196 Runs the function passed to map() on a chunk of the
197 iterable passed to map.
198
199 This function is run in a separate process.
200
201 """
202 return [fn(*args) for args in chunk]
203
Thomas Moreau94459fd2018-01-05 11:15:54 +0100204
205def _sendback_result(result_queue, work_id, result=None, exception=None):
206 """Safely send back the given result or exception"""
207 try:
208 result_queue.put(_ResultItem(work_id, result=result,
209 exception=exception))
210 except BaseException as e:
211 exc = _ExceptionWithTraceback(e, e.__traceback__)
212 result_queue.put(_ResultItem(work_id, exception=exc))
213
214
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100215def _process_worker(call_queue, result_queue, initializer, initargs):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000216 """Evaluates calls from call_queue and places the results in result_queue.
217
Georg Brandlfb1720b2010-12-09 18:08:43 +0000218 This worker is run in a separate process.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000219
220 Args:
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200221 call_queue: A ctx.Queue of _CallItems that will be read and
Brian Quinlan81c4d362010-09-18 22:35:02 +0000222 evaluated by the worker.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200223 result_queue: A ctx.Queue of _ResultItems that will written
Brian Quinlan81c4d362010-09-18 22:35:02 +0000224 to by the worker.
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100225 initializer: A callable initializer, or None
226 initargs: A tuple of args for the initializer
Brian Quinlan81c4d362010-09-18 22:35:02 +0000227 """
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100228 if initializer is not None:
229 try:
230 initializer(*initargs)
231 except BaseException:
232 _base.LOGGER.critical('Exception in initializer:', exc_info=True)
233 # The parent will notice that the process stopped and
234 # mark the pool broken
235 return
Brian Quinlan81c4d362010-09-18 22:35:02 +0000236 while True:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200237 call_item = call_queue.get(block=True)
238 if call_item is None:
239 # Wake up queue management thread
Antoine Pitroudd696492011-06-08 17:21:55 +0200240 result_queue.put(os.getpid())
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200241 return
Brian Quinlan81c4d362010-09-18 22:35:02 +0000242 try:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200243 r = call_item.fn(*call_item.args, **call_item.kwargs)
244 except BaseException as e:
Antoine Pitrou1285c9b2015-01-17 20:02:14 +0100245 exc = _ExceptionWithTraceback(e, e.__traceback__)
Thomas Moreau94459fd2018-01-05 11:15:54 +0100246 _sendback_result(result_queue, call_item.work_id, exception=exc)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000247 else:
Thomas Moreau94459fd2018-01-05 11:15:54 +0100248 _sendback_result(result_queue, call_item.work_id, result=r)
Dave Chevell962bdea2019-03-17 09:28:51 +1100249 del r
Brian Quinlan81c4d362010-09-18 22:35:02 +0000250
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200251 # Liberate the resource as soon as possible, to avoid holding onto
252 # open files or shared memory that is not needed anymore
253 del call_item
254
255
Thomas Moreau0e890762020-03-01 21:49:14 +0100256class _ExecutorManagerThread(threading.Thread):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000257 """Manages the communication between this process and the worker processes.
258
Thomas Moreau0e890762020-03-01 21:49:14 +0100259 The manager is run in a local thread.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000260
261 Args:
Thomas Moreau0e890762020-03-01 21:49:14 +0100262 executor: A reference to the ProcessPoolExecutor that owns
263 this thread. A weakref will be own by the manager as well as
264 references to internal objects used to introspect the state of
265 the executor.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000266 """
Antoine Pitrou020436b2011-07-02 21:20:25 +0200267
Thomas Moreau0e890762020-03-01 21:49:14 +0100268 def __init__(self, executor):
269 # Store references to necessary internals of the executor.
Antoine Pitroudd696492011-06-08 17:21:55 +0200270
Thomas Moreau0e890762020-03-01 21:49:14 +0100271 # A _ThreadWakeup to allow waking up the queue_manager_thread from the
272 # main Thread and avoid deadlocks caused by permanently locked queues.
273 self.thread_wakeup = executor._executor_manager_thread_wakeup
Victor Stinnera4dfe8e2020-04-29 03:32:06 +0200274 self.shutdown_lock = executor._shutdown_lock
Thomas Moreau94459fd2018-01-05 11:15:54 +0100275
Thomas Moreau0e890762020-03-01 21:49:14 +0100276 # A weakref.ref to the ProcessPoolExecutor that owns this thread. Used
277 # to determine if the ProcessPoolExecutor has been garbage collected
278 # and that the manager can exit.
279 # When the executor gets garbage collected, the weakref callback
280 # will wake up the queue management thread so that it can terminate
281 # if there is no pending work item.
Victor Stinnera4dfe8e2020-04-29 03:32:06 +0200282 def weakref_cb(_,
283 thread_wakeup=self.thread_wakeup,
284 shutdown_lock=self.shutdown_lock):
Thomas Moreau0e890762020-03-01 21:49:14 +0100285 mp.util.debug('Executor collected: triggering callback for'
286 ' QueueManager wakeup')
Victor Stinnera4dfe8e2020-04-29 03:32:06 +0200287 with shutdown_lock:
288 thread_wakeup.wakeup()
Antoine Pitroudd696492011-06-08 17:21:55 +0200289
Thomas Moreau0e890762020-03-01 21:49:14 +0100290 self.executor_reference = weakref.ref(executor, weakref_cb)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +0100291
Thomas Moreau0e890762020-03-01 21:49:14 +0100292 # A list of the ctx.Process instances used as workers.
293 self.processes = executor._processes
Brian Quinlan81c4d362010-09-18 22:35:02 +0000294
Thomas Moreau0e890762020-03-01 21:49:14 +0100295 # A ctx.Queue that will be filled with _CallItems derived from
296 # _WorkItems for processing by the process workers.
297 self.call_queue = executor._call_queue
298
299 # A ctx.SimpleQueue of _ResultItems generated by the process workers.
300 self.result_queue = executor._result_queue
301
302 # A queue.Queue of work ids e.g. Queue([5, 6, ...]).
303 self.work_ids_queue = executor._work_ids
304
305 # A dict mapping work ids to _WorkItems e.g.
306 # {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
307 self.pending_work_items = executor._pending_work_items
308
Thomas Moreau0e890762020-03-01 21:49:14 +0100309 super().__init__()
Thomas Moreau0e890762020-03-01 21:49:14 +0100310
311 def run(self):
312 # Main loop for the executor manager thread.
313
314 while True:
315 self.add_call_item_to_queue()
316
317 result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
318
319 if is_broken:
320 self.terminate_broken(cause)
321 return
322 if result_item is not None:
323 self.process_result_item(result_item)
324 # Delete reference to result_item to avoid keeping references
325 # while waiting on new results.
326 del result_item
327
Kyle Stanley1ac6e372020-04-19 10:00:59 -0400328 # attempt to increment idle process count
329 executor = self.executor_reference()
330 if executor is not None:
331 executor._idle_worker_semaphore.release()
332 del executor
333
Thomas Moreau0e890762020-03-01 21:49:14 +0100334 if self.is_shutting_down():
335 self.flag_executor_shutting_down()
336
337 # Since no new work items can be added, it is safe to shutdown
338 # this thread if there are no pending work items.
339 if not self.pending_work_items:
340 self.join_executor_internals()
341 return
342
343 def add_call_item_to_queue(self):
344 # Fills call_queue with _WorkItems from pending_work_items.
345 # This function never blocks.
346 while True:
347 if self.call_queue.full():
348 return
349 try:
350 work_id = self.work_ids_queue.get(block=False)
351 except queue.Empty:
352 return
353 else:
354 work_item = self.pending_work_items[work_id]
355
356 if work_item.future.set_running_or_notify_cancel():
357 self.call_queue.put(_CallItem(work_id,
358 work_item.fn,
359 work_item.args,
360 work_item.kwargs),
361 block=True)
362 else:
363 del self.pending_work_items[work_id]
364 continue
365
366 def wait_result_broken_or_wakeup(self):
Thomas Moreau94459fd2018-01-05 11:15:54 +0100367 # Wait for a result to be ready in the result_queue while checking
368 # that all worker processes are still running, or for a wake up
369 # signal send. The wake up signals come either from new tasks being
370 # submitted, from the executor being shutdown/gc-ed, or from the
371 # shutdown of the python interpreter.
Thomas Moreau0e890762020-03-01 21:49:14 +0100372 result_reader = self.result_queue._reader
Victor Stinnera4dfe8e2020-04-29 03:32:06 +0200373 assert not self.thread_wakeup._closed
Thomas Moreau0e890762020-03-01 21:49:14 +0100374 wakeup_reader = self.thread_wakeup._reader
375 readers = [result_reader, wakeup_reader]
376 worker_sentinels = [p.sentinel for p in self.processes.values()]
Brian Quinlanf7bda5c2019-05-07 13:31:11 -0400377 ready = mp.connection.wait(readers + worker_sentinels)
Thomas Moreau94459fd2018-01-05 11:15:54 +0100378
379 cause = None
380 is_broken = True
Thomas Moreau0e890762020-03-01 21:49:14 +0100381 result_item = None
Thomas Moreau94459fd2018-01-05 11:15:54 +0100382 if result_reader in ready:
383 try:
384 result_item = result_reader.recv()
385 is_broken = False
386 except BaseException as e:
387 cause = traceback.format_exception(type(e), e, e.__traceback__)
388
389 elif wakeup_reader in ready:
390 is_broken = False
Victor Stinnera4dfe8e2020-04-29 03:32:06 +0200391
392 with self.shutdown_lock:
393 self.thread_wakeup.clear()
Thomas Moreau0e890762020-03-01 21:49:14 +0100394
395 return result_item, is_broken, cause
396
397 def process_result_item(self, result_item):
398 # Process the received a result_item. This can be either the PID of a
399 # worker that exited gracefully or a _ResultItem
400
Antoine Pitroudd696492011-06-08 17:21:55 +0200401 if isinstance(result_item, int):
402 # Clean shutdown of a worker using its PID
403 # (avoids marking the executor broken)
Thomas Moreau0e890762020-03-01 21:49:14 +0100404 assert self.is_shutting_down()
405 p = self.processes.pop(result_item)
Antoine Pitroud06a0652011-07-16 01:13:34 +0200406 p.join()
Thomas Moreau0e890762020-03-01 21:49:14 +0100407 if not self.processes:
408 self.join_executor_internals()
Antoine Pitrou020436b2011-07-02 21:20:25 +0200409 return
Thomas Moreau0e890762020-03-01 21:49:14 +0100410 else:
411 # Received a _ResultItem so mark the future as completed.
412 work_item = self.pending_work_items.pop(result_item.work_id, None)
Antoine Pitroudd696492011-06-08 17:21:55 +0200413 # work_item can be None if another process terminated (see above)
414 if work_item is not None:
415 if result_item.exception:
416 work_item.future.set_exception(result_item.exception)
417 else:
418 work_item.future.set_result(result_item.result)
Thomas Moreau94459fd2018-01-05 11:15:54 +0100419
Thomas Moreau0e890762020-03-01 21:49:14 +0100420 def is_shutting_down(self):
421 # Check whether we should start shutting down the executor.
422 executor = self.executor_reference()
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100423 # No more work items can be added if:
424 # - The interpreter is shutting down OR
425 # - The executor that owns this worker has been collected OR
426 # - The executor that owns this worker has been shutdown.
Thomas Moreau0e890762020-03-01 21:49:14 +0100427 return (_global_shutdown or executor is None
428 or executor._shutdown_thread)
Kyle Stanley339fd462020-02-02 07:49:00 -0500429
Thomas Moreau0e890762020-03-01 21:49:14 +0100430 def terminate_broken(self, cause):
431 # Terminate the executor because it is in a broken state. The cause
432 # argument can be used to display more information on the error that
433 # lead the executor into becoming broken.
Kyle Stanley339fd462020-02-02 07:49:00 -0500434
Thomas Moreau0e890762020-03-01 21:49:14 +0100435 # Mark the process pool broken so that submits fail right now.
436 executor = self.executor_reference()
437 if executor is not None:
438 executor._broken = ('A child process terminated '
439 'abruptly, the process pool is not '
440 'usable anymore')
441 executor._shutdown_thread = True
442 executor = None
443
444 # All pending tasks are to be marked failed with the following
445 # BrokenProcessPool error
446 bpe = BrokenProcessPool("A process in the process pool was "
447 "terminated abruptly while the future was "
448 "running or pending.")
449 if cause is not None:
450 bpe.__cause__ = _RemoteTraceback(
451 f"\n'''\n{''.join(cause)}'''")
452
453 # Mark pending tasks as failed.
454 for work_id, work_item in self.pending_work_items.items():
455 work_item.future.set_exception(bpe)
456 # Delete references to object. See issue16284
457 del work_item
458 self.pending_work_items.clear()
459
460 # Terminate remaining workers forcibly: the queues or their
461 # locks may be in a dirty state and block forever.
462 for p in self.processes.values():
463 p.terminate()
464
465 # clean up resources
466 self.join_executor_internals()
467
468 def flag_executor_shutting_down(self):
469 # Flag the executor as shutting down and cancel remaining tasks if
470 # requested as early as possible if it is not gc-ed yet.
471 executor = self.executor_reference()
472 if executor is not None:
473 executor._shutdown_thread = True
474 # Cancel pending work items if requested.
475 if executor._cancel_pending_futures:
476 # Cancel all pending futures and update pending_work_items
477 # to only have futures that are currently running.
478 new_pending_work_items = {}
479 for work_id, work_item in self.pending_work_items.items():
480 if not work_item.future.cancel():
481 new_pending_work_items[work_id] = work_item
482 self.pending_work_items = new_pending_work_items
483 # Drain work_ids_queue since we no longer need to
484 # add items to the call queue.
485 while True:
486 try:
487 self.work_ids_queue.get_nowait()
488 except queue.Empty:
489 break
490 # Make sure we do this only once to not waste time looping
491 # on running processes over and over.
492 executor._cancel_pending_futures = False
493
494 def shutdown_workers(self):
495 n_children_to_stop = self.get_n_children_alive()
496 n_sentinels_sent = 0
497 # Send the right number of sentinels, to make sure all children are
498 # properly terminated.
499 while (n_sentinels_sent < n_children_to_stop
500 and self.get_n_children_alive() > 0):
501 for i in range(n_children_to_stop - n_sentinels_sent):
502 try:
503 self.call_queue.put_nowait(None)
504 n_sentinels_sent += 1
505 except queue.Full:
506 break
507
508 def join_executor_internals(self):
509 self.shutdown_workers()
510 # Release the queue's resources as soon as possible.
511 self.call_queue.close()
512 self.call_queue.join_thread()
Victor Stinnera4dfe8e2020-04-29 03:32:06 +0200513 with self.shutdown_lock:
514 self.thread_wakeup.close()
Thomas Moreau0e890762020-03-01 21:49:14 +0100515 # If .join() is not called on the created processes then
516 # some ctx.Queue methods may deadlock on Mac OS X.
517 for p in self.processes.values():
518 p.join()
519
520 def get_n_children_alive(self):
521 # This is an upper bound on the number of children alive.
522 return sum(p.is_alive() for p in self.processes.values())
Brian Quinlan81c4d362010-09-18 22:35:02 +0000523
Thomas Moreau94459fd2018-01-05 11:15:54 +0100524
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000525_system_limits_checked = False
526_system_limited = None
Thomas Moreau94459fd2018-01-05 11:15:54 +0100527
528
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000529def _check_system_limits():
530 global _system_limits_checked, _system_limited
531 if _system_limits_checked:
532 if _system_limited:
533 raise NotImplementedError(_system_limited)
534 _system_limits_checked = True
535 try:
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000536 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
537 except (AttributeError, ValueError):
538 # sysconf not available or setting not available
539 return
540 if nsems_max == -1:
Ezio Melottib5bc3532013-08-17 16:11:40 +0300541 # indetermined limit, assume that limit is determined
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000542 # by available memory only
543 return
544 if nsems_max >= 256:
545 # minimum number of semaphores available
546 # according to POSIX
547 return
Thomas Moreau94459fd2018-01-05 11:15:54 +0100548 _system_limited = ("system provides too few semaphores (%d"
549 " available, 256 necessary)" % nsems_max)
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000550 raise NotImplementedError(_system_limited)
551
Antoine Pitroudd696492011-06-08 17:21:55 +0200552
Grzegorz Grzywacz97e1b1c2017-09-01 18:54:00 +0200553def _chain_from_iterable_of_lists(iterable):
554 """
555 Specialized implementation of itertools.chain.from_iterable.
556 Each item in *iterable* should be a list. This function is
557 careful not to keep references to yielded objects.
558 """
559 for element in iterable:
560 element.reverse()
561 while element:
562 yield element.pop()
563
564
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100565class BrokenProcessPool(_base.BrokenExecutor):
Antoine Pitroudd696492011-06-08 17:21:55 +0200566 """
567 Raised when a process in a ProcessPoolExecutor terminated abruptly
568 while a future was in the running state.
569 """
570
571
Brian Quinlan81c4d362010-09-18 22:35:02 +0000572class ProcessPoolExecutor(_base.Executor):
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100573 def __init__(self, max_workers=None, mp_context=None,
574 initializer=None, initargs=()):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000575 """Initializes a new ProcessPoolExecutor instance.
576
577 Args:
578 max_workers: The maximum number of processes that can be used to
579 execute the given calls. If None or not given then as many
580 worker processes will be created as the machine has processors.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200581 mp_context: A multiprocessing context to launch the workers. This
582 object should provide SimpleQueue, Queue and Process.
ubordignon552ace72019-06-15 13:43:10 +0200583 initializer: A callable used to initialize worker processes.
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100584 initargs: A tuple of arguments to pass to the initializer.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000585 """
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000586 _check_system_limits()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000587
588 if max_workers is None:
Charles-François Natali37cfb0a2013-06-28 19:25:45 +0200589 self._max_workers = os.cpu_count() or 1
Brian Quinlan39889862019-05-08 14:04:53 -0400590 if sys.platform == 'win32':
591 self._max_workers = min(_MAX_WINDOWS_WORKERS,
592 self._max_workers)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000593 else:
Brian Quinlan20efceb2014-05-17 13:51:10 -0700594 if max_workers <= 0:
595 raise ValueError("max_workers must be greater than 0")
Brian Quinlan39889862019-05-08 14:04:53 -0400596 elif (sys.platform == 'win32' and
597 max_workers > _MAX_WINDOWS_WORKERS):
598 raise ValueError(
599 f"max_workers must be <= {_MAX_WINDOWS_WORKERS}")
Brian Quinlan20efceb2014-05-17 13:51:10 -0700600
Brian Quinlan81c4d362010-09-18 22:35:02 +0000601 self._max_workers = max_workers
Thomas Moreau94459fd2018-01-05 11:15:54 +0100602
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200603 if mp_context is None:
604 mp_context = mp.get_context()
605 self._mp_context = mp_context
Brian Quinlan81c4d362010-09-18 22:35:02 +0000606
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100607 if initializer is not None and not callable(initializer):
608 raise TypeError("initializer must be a callable")
609 self._initializer = initializer
610 self._initargs = initargs
611
Thomas Moreau94459fd2018-01-05 11:15:54 +0100612 # Management thread
Thomas Moreau0e890762020-03-01 21:49:14 +0100613 self._executor_manager_thread = None
Thomas Moreau94459fd2018-01-05 11:15:54 +0100614
Antoine Pitroudd696492011-06-08 17:21:55 +0200615 # Map of pids to processes
616 self._processes = {}
Brian Quinlan81c4d362010-09-18 22:35:02 +0000617
618 # Shutdown is a two-step process.
619 self._shutdown_thread = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000620 self._shutdown_lock = threading.Lock()
Kyle Stanley1ac6e372020-04-19 10:00:59 -0400621 self._idle_worker_semaphore = threading.Semaphore(0)
Antoine Pitroudd696492011-06-08 17:21:55 +0200622 self._broken = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000623 self._queue_count = 0
624 self._pending_work_items = {}
Kyle Stanley339fd462020-02-02 07:49:00 -0500625 self._cancel_pending_futures = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000626
Thomas Moreau94459fd2018-01-05 11:15:54 +0100627 # _ThreadWakeup is a communication channel used to interrupt the wait
Thomas Moreau0e890762020-03-01 21:49:14 +0100628 # of the main loop of executor_manager_thread from another thread (e.g.
Thomas Moreau94459fd2018-01-05 11:15:54 +0100629 # when calling executor.submit or executor.shutdown). We do not use the
Thomas Moreau0e890762020-03-01 21:49:14 +0100630 # _result_queue to send wakeup signals to the executor_manager_thread
Thomas Moreau94459fd2018-01-05 11:15:54 +0100631 # as it could result in a deadlock if a worker process dies with the
632 # _result_queue write lock still acquired.
Victor Stinnera4dfe8e2020-04-29 03:32:06 +0200633 #
634 # _shutdown_lock must be locked to access _ThreadWakeup.
Thomas Moreau0e890762020-03-01 21:49:14 +0100635 self._executor_manager_thread_wakeup = _ThreadWakeup()
Thomas Moreau94459fd2018-01-05 11:15:54 +0100636
Thomas Moreaua5cbab52020-02-16 19:09:26 +0100637 # Create communication channels for the executor
638 # Make the call queue slightly larger than the number of processes to
639 # prevent the worker processes from idling. But don't make it too big
640 # because futures in the call queue cannot be cancelled.
641 queue_size = self._max_workers + EXTRA_QUEUED_CALLS
642 self._call_queue = _SafeQueue(
643 max_size=queue_size, ctx=self._mp_context,
644 pending_work_items=self._pending_work_items,
Victor Stinnera4dfe8e2020-04-29 03:32:06 +0200645 shutdown_lock=self._shutdown_lock,
Thomas Moreau0e890762020-03-01 21:49:14 +0100646 thread_wakeup=self._executor_manager_thread_wakeup)
Thomas Moreaua5cbab52020-02-16 19:09:26 +0100647 # Killed worker processes can produce spurious "broken pipe"
648 # tracebacks in the queue's own worker thread. But we detect killed
649 # processes anyway, so silence the tracebacks.
650 self._call_queue._ignore_epipe = True
651 self._result_queue = mp_context.SimpleQueue()
652 self._work_ids = queue.Queue()
653
Thomas Moreau0e890762020-03-01 21:49:14 +0100654 def _start_executor_manager_thread(self):
655 if self._executor_manager_thread is None:
Antoine Pitroudd696492011-06-08 17:21:55 +0200656 # Start the processes so that their sentinels are known.
Thomas Moreau0e890762020-03-01 21:49:14 +0100657 self._executor_manager_thread = _ExecutorManagerThread(self)
658 self._executor_manager_thread.start()
659 _threads_wakeups[self._executor_manager_thread] = \
660 self._executor_manager_thread_wakeup
Brian Quinlan81c4d362010-09-18 22:35:02 +0000661
662 def _adjust_process_count(self):
Kyle Stanley1ac6e372020-04-19 10:00:59 -0400663 # if there's an idle process, we don't need to spawn a new one.
664 if self._idle_worker_semaphore.acquire(blocking=False):
665 return
666
667 process_count = len(self._processes)
668 if process_count < self._max_workers:
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200669 p = self._mp_context.Process(
670 target=_process_worker,
671 args=(self._call_queue,
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100672 self._result_queue,
673 self._initializer,
674 self._initargs))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000675 p.start()
Antoine Pitroudd696492011-06-08 17:21:55 +0200676 self._processes[p.pid] = p
Brian Quinlan81c4d362010-09-18 22:35:02 +0000677
Serhiy Storchaka142566c2019-06-05 18:22:31 +0300678 def submit(self, fn, /, *args, **kwargs):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000679 with self._shutdown_lock:
Antoine Pitroudd696492011-06-08 17:21:55 +0200680 if self._broken:
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100681 raise BrokenProcessPool(self._broken)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000682 if self._shutdown_thread:
683 raise RuntimeError('cannot schedule new futures after shutdown')
Mark Nemecc4b695f2018-04-10 18:23:14 +0100684 if _global_shutdown:
685 raise RuntimeError('cannot schedule new futures after '
686 'interpreter shutdown')
Brian Quinlan81c4d362010-09-18 22:35:02 +0000687
688 f = _base.Future()
689 w = _WorkItem(f, fn, args, kwargs)
690
691 self._pending_work_items[self._queue_count] = w
692 self._work_ids.put(self._queue_count)
693 self._queue_count += 1
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100694 # Wake up queue management thread
Thomas Moreau0e890762020-03-01 21:49:14 +0100695 self._executor_manager_thread_wakeup.wakeup()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000696
Kyle Stanley1ac6e372020-04-19 10:00:59 -0400697 self._adjust_process_count()
Thomas Moreau0e890762020-03-01 21:49:14 +0100698 self._start_executor_manager_thread()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000699 return f
700 submit.__doc__ = _base.Executor.submit.__doc__
701
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200702 def map(self, fn, *iterables, timeout=None, chunksize=1):
Martin Panterd2ad5712015-11-02 04:20:33 +0000703 """Returns an iterator equivalent to map(fn, iter).
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200704
705 Args:
706 fn: A callable that will take as many arguments as there are
707 passed iterables.
708 timeout: The maximum number of seconds to wait. If None, then there
709 is no limit on the wait time.
710 chunksize: If greater than one, the iterables will be chopped into
711 chunks of size chunksize and submitted to the process pool.
712 If set to one, the items in the list will be sent one at a time.
713
714 Returns:
715 An iterator equivalent to: map(func, *iterables) but the calls may
716 be evaluated out-of-order.
717
718 Raises:
719 TimeoutError: If the entire result iterator could not be generated
720 before the given timeout.
721 Exception: If fn(*args) raises for any values.
722 """
723 if chunksize < 1:
724 raise ValueError("chunksize must be >= 1.")
725
726 results = super().map(partial(_process_chunk, fn),
727 _get_chunks(*iterables, chunksize=chunksize),
728 timeout=timeout)
Grzegorz Grzywacz97e1b1c2017-09-01 18:54:00 +0200729 return _chain_from_iterable_of_lists(results)
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200730
Kyle Stanley339fd462020-02-02 07:49:00 -0500731 def shutdown(self, wait=True, *, cancel_futures=False):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000732 with self._shutdown_lock:
Kyle Stanley339fd462020-02-02 07:49:00 -0500733 self._cancel_pending_futures = cancel_futures
Brian Quinlan81c4d362010-09-18 22:35:02 +0000734 self._shutdown_thread = True
Victor Stinnera4dfe8e2020-04-29 03:32:06 +0200735 if self._executor_manager_thread_wakeup is not None:
736 # Wake up queue management thread
737 self._executor_manager_thread_wakeup.wakeup()
Kyle Stanley339fd462020-02-02 07:49:00 -0500738
Victor Stinnera4dfe8e2020-04-29 03:32:06 +0200739 if self._executor_manager_thread is not None and wait:
740 self._executor_manager_thread.join()
Ezio Melottib5bc3532013-08-17 16:11:40 +0300741 # To reduce the risk of opening too many files, remove references to
Brian Quinlan81c4d362010-09-18 22:35:02 +0000742 # objects that use file descriptors.
Thomas Moreau0e890762020-03-01 21:49:14 +0100743 self._executor_manager_thread = None
Thomas Moreaua5cbab52020-02-16 19:09:26 +0100744 self._call_queue = None
Victor Stinner1a275012020-04-27 20:53:37 +0200745 if self._result_queue is not None and wait:
746 self._result_queue.close()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000747 self._result_queue = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000748 self._processes = None
Victor Stinnera4dfe8e2020-04-29 03:32:06 +0200749 self._executor_manager_thread_wakeup = None
Thomas Moreau095ee412018-03-12 18:18:41 +0100750
Brian Quinlan81c4d362010-09-18 22:35:02 +0000751 shutdown.__doc__ = _base.Executor.shutdown.__doc__