blob: fd9f572b6c71160e22cde46c4b20b80c17c3cb53 [file] [log] [blame]
Brian Quinlan81c4d362010-09-18 22:35:02 +00001# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4"""Implements ProcessPoolExecutor.
5
Thomas Graingerf938d8b2019-04-12 17:17:17 +01006The following diagram and text describe the data-flow through the system:
Brian Quinlan81c4d362010-09-18 22:35:02 +00007
8|======================= In-process =====================|== Out-of-process ==|
9
10+----------+ +----------+ +--------+ +-----------+ +---------+
Thomas Moreau94459fd2018-01-05 11:15:54 +010011| | => | Work Ids | | | | Call Q | | Process |
12| | +----------+ | | +-----------+ | Pool |
13| | | ... | | | | ... | +---------+
14| | | 6 | => | | => | 5, call() | => | |
Brian Quinlan81c4d362010-09-18 22:35:02 +000015| | | 7 | | | | ... | | |
16| Process | | ... | | Local | +-----------+ | Process |
17| Pool | +----------+ | Worker | | #1..n |
18| Executor | | Thread | | |
19| | +----------- + | | +-----------+ | |
20| | <=> | Work Items | <=> | | <= | Result Q | <= | |
21| | +------------+ | | +-----------+ | |
22| | | 6: call() | | | | ... | | |
23| | | future | | | | 4, result | | |
24| | | ... | | | | 3, except | | |
25+----------+ +------------+ +--------+ +-----------+ +---------+
26
27Executor.submit() called:
28- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29- adds the id of the _WorkItem to the "Work Ids" queue
30
31Local worker thread:
32- reads work ids from the "Work Ids" queue and looks up the corresponding
33 WorkItem from the "Work Items" dict: if the work item has been cancelled then
34 it is simply removed from the dict, otherwise it is repackaged as a
35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38- reads _ResultItems from "Result Q", updates the future stored in the
39 "Work Items" dict and deletes the dict entry
40
41Process #1..n:
42- reads _CallItems from "Call Q", executes the calls, and puts the resulting
Mark Dickinson5ee24042012-10-20 13:16:49 +010043 _ResultItems in "Result Q"
Brian Quinlan81c4d362010-09-18 22:35:02 +000044"""
45
46__author__ = 'Brian Quinlan (brian@sweetapp.com)'
47
48import atexit
Antoine Pitroudd696492011-06-08 17:21:55 +020049import os
Brian Quinlan81c4d362010-09-18 22:35:02 +000050from concurrent.futures import _base
51import queue
Richard Oudkerk1f2eaa92013-10-16 17:06:22 +010052from queue import Full
Thomas Moreaue8c368d2017-10-03 11:53:17 +020053import multiprocessing as mp
Brian Quinlanf7bda5c2019-05-07 13:31:11 -040054import multiprocessing.connection
Thomas Moreau94459fd2018-01-05 11:15:54 +010055from multiprocessing.queues import Queue
Brian Quinlan81c4d362010-09-18 22:35:02 +000056import threading
57import weakref
Antoine Pitrou4aae2762014-10-04 20:20:10 +020058from functools import partial
59import itertools
Brian Quinlan39889862019-05-08 14:04:53 -040060import sys
Antoine Pitrou1285c9b2015-01-17 20:02:14 +010061import traceback
Brian Quinlan81c4d362010-09-18 22:35:02 +000062
63# Workers are created as daemon threads and processes. This is done to allow the
64# interpreter to exit when there are still idle processes in a
65# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
66# allowing workers to die with the interpreter has two undesirable properties:
Raymond Hettinger15f44ab2016-08-30 10:47:49 -070067# - The workers would still be running during interpreter shutdown,
Brian Quinlan81c4d362010-09-18 22:35:02 +000068# meaning that they would fail in unpredictable ways.
69# - The workers could be killed while evaluating a work item, which could
70# be bad if the callable being evaluated has external side-effects e.g.
71# writing to a file.
72#
73# To work around this problem, an exit handler is installed which tells the
74# workers to exit when their work queues are empty and then waits until the
75# threads/processes finish.
76
Thomas Moreau94459fd2018-01-05 11:15:54 +010077_threads_wakeups = weakref.WeakKeyDictionary()
Thomas Moreaue8c368d2017-10-03 11:53:17 +020078_global_shutdown = False
Brian Quinlan81c4d362010-09-18 22:35:02 +000079
Thomas Moreau94459fd2018-01-05 11:15:54 +010080
81class _ThreadWakeup:
Thomas Moreau94459fd2018-01-05 11:15:54 +010082 def __init__(self):
83 self._reader, self._writer = mp.Pipe(duplex=False)
84
Thomas Moreau095ee412018-03-12 18:18:41 +010085 def close(self):
86 self._writer.close()
87 self._reader.close()
88
Thomas Moreau94459fd2018-01-05 11:15:54 +010089 def wakeup(self):
90 self._writer.send_bytes(b"")
91
92 def clear(self):
93 while self._reader.poll():
94 self._reader.recv_bytes()
95
96
Brian Quinlan81c4d362010-09-18 22:35:02 +000097def _python_exit():
Thomas Moreaue8c368d2017-10-03 11:53:17 +020098 global _global_shutdown
99 _global_shutdown = True
Thomas Moreau94459fd2018-01-05 11:15:54 +0100100 items = list(_threads_wakeups.items())
101 for _, thread_wakeup in items:
102 thread_wakeup.wakeup()
103 for t, _ in items:
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100104 t.join()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000105
106# Controls how many more calls than processes will be queued in the call queue.
107# A smaller number will mean that processes spend more time idle waiting for
108# work while a larger number will make Future.cancel() succeed less frequently
109# (Futures in the call queue cannot be cancelled).
110EXTRA_QUEUED_CALLS = 1
111
Thomas Moreau94459fd2018-01-05 11:15:54 +0100112
Brian Quinlan39889862019-05-08 14:04:53 -0400113# On Windows, WaitForMultipleObjects is used to wait for processes to finish.
114# It can wait on, at most, 63 objects. There is an overhead of two objects:
115# - the result queue reader
116# - the thread wakeup reader
117_MAX_WINDOWS_WORKERS = 63 - 2
118
Antoine Pitrou1285c9b2015-01-17 20:02:14 +0100119# Hack to embed stringification of remote traceback in local traceback
120
121class _RemoteTraceback(Exception):
122 def __init__(self, tb):
123 self.tb = tb
124 def __str__(self):
125 return self.tb
126
127class _ExceptionWithTraceback:
128 def __init__(self, exc, tb):
129 tb = traceback.format_exception(type(exc), exc, tb)
130 tb = ''.join(tb)
131 self.exc = exc
132 self.tb = '\n"""\n%s"""' % tb
133 def __reduce__(self):
134 return _rebuild_exc, (self.exc, self.tb)
135
136def _rebuild_exc(exc, tb):
137 exc.__cause__ = _RemoteTraceback(tb)
138 return exc
139
Brian Quinlan81c4d362010-09-18 22:35:02 +0000140class _WorkItem(object):
141 def __init__(self, future, fn, args, kwargs):
142 self.future = future
143 self.fn = fn
144 self.args = args
145 self.kwargs = kwargs
146
147class _ResultItem(object):
148 def __init__(self, work_id, exception=None, result=None):
149 self.work_id = work_id
150 self.exception = exception
151 self.result = result
152
153class _CallItem(object):
154 def __init__(self, work_id, fn, args, kwargs):
155 self.work_id = work_id
156 self.fn = fn
157 self.args = args
158 self.kwargs = kwargs
159
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100160
Thomas Moreau94459fd2018-01-05 11:15:54 +0100161class _SafeQueue(Queue):
162 """Safe Queue set exception to the future object linked to a job"""
163 def __init__(self, max_size=0, *, ctx, pending_work_items):
164 self.pending_work_items = pending_work_items
165 super().__init__(max_size, ctx=ctx)
166
167 def _on_queue_feeder_error(self, e, obj):
168 if isinstance(obj, _CallItem):
169 tb = traceback.format_exception(type(e), e, e.__traceback__)
170 e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
171 work_item = self.pending_work_items.pop(obj.work_id, None)
172 # work_item can be None if another process terminated. In this case,
173 # the queue_manager_thread fails all work_items with BrokenProcessPool
174 if work_item is not None:
175 work_item.future.set_exception(e)
176 else:
177 super()._on_queue_feeder_error(e, obj)
178
179
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200180def _get_chunks(*iterables, chunksize):
181 """ Iterates over zip()ed iterables in chunks. """
182 it = zip(*iterables)
183 while True:
184 chunk = tuple(itertools.islice(it, chunksize))
185 if not chunk:
186 return
187 yield chunk
188
189def _process_chunk(fn, chunk):
190 """ Processes a chunk of an iterable passed to map.
191
192 Runs the function passed to map() on a chunk of the
193 iterable passed to map.
194
195 This function is run in a separate process.
196
197 """
198 return [fn(*args) for args in chunk]
199
Thomas Moreau94459fd2018-01-05 11:15:54 +0100200
201def _sendback_result(result_queue, work_id, result=None, exception=None):
202 """Safely send back the given result or exception"""
203 try:
204 result_queue.put(_ResultItem(work_id, result=result,
205 exception=exception))
206 except BaseException as e:
207 exc = _ExceptionWithTraceback(e, e.__traceback__)
208 result_queue.put(_ResultItem(work_id, exception=exc))
209
210
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100211def _process_worker(call_queue, result_queue, initializer, initargs):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000212 """Evaluates calls from call_queue and places the results in result_queue.
213
Georg Brandlfb1720b2010-12-09 18:08:43 +0000214 This worker is run in a separate process.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000215
216 Args:
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200217 call_queue: A ctx.Queue of _CallItems that will be read and
Brian Quinlan81c4d362010-09-18 22:35:02 +0000218 evaluated by the worker.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200219 result_queue: A ctx.Queue of _ResultItems that will written
Brian Quinlan81c4d362010-09-18 22:35:02 +0000220 to by the worker.
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100221 initializer: A callable initializer, or None
222 initargs: A tuple of args for the initializer
Brian Quinlan81c4d362010-09-18 22:35:02 +0000223 """
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100224 if initializer is not None:
225 try:
226 initializer(*initargs)
227 except BaseException:
228 _base.LOGGER.critical('Exception in initializer:', exc_info=True)
229 # The parent will notice that the process stopped and
230 # mark the pool broken
231 return
Brian Quinlan81c4d362010-09-18 22:35:02 +0000232 while True:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200233 call_item = call_queue.get(block=True)
234 if call_item is None:
235 # Wake up queue management thread
Antoine Pitroudd696492011-06-08 17:21:55 +0200236 result_queue.put(os.getpid())
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200237 return
Brian Quinlan81c4d362010-09-18 22:35:02 +0000238 try:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200239 r = call_item.fn(*call_item.args, **call_item.kwargs)
240 except BaseException as e:
Antoine Pitrou1285c9b2015-01-17 20:02:14 +0100241 exc = _ExceptionWithTraceback(e, e.__traceback__)
Thomas Moreau94459fd2018-01-05 11:15:54 +0100242 _sendback_result(result_queue, call_item.work_id, exception=exc)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000243 else:
Thomas Moreau94459fd2018-01-05 11:15:54 +0100244 _sendback_result(result_queue, call_item.work_id, result=r)
Dave Chevell962bdea2019-03-17 09:28:51 +1100245 del r
Brian Quinlan81c4d362010-09-18 22:35:02 +0000246
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200247 # Liberate the resource as soon as possible, to avoid holding onto
248 # open files or shared memory that is not needed anymore
249 del call_item
250
251
Brian Quinlan81c4d362010-09-18 22:35:02 +0000252def _add_call_item_to_queue(pending_work_items,
253 work_ids,
254 call_queue):
255 """Fills call_queue with _WorkItems from pending_work_items.
256
257 This function never blocks.
258
259 Args:
260 pending_work_items: A dict mapping work ids to _WorkItems e.g.
261 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
262 work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
263 are consumed and the corresponding _WorkItems from
264 pending_work_items are transformed into _CallItems and put in
265 call_queue.
266 call_queue: A multiprocessing.Queue that will be filled with _CallItems
267 derived from _WorkItems.
268 """
269 while True:
270 if call_queue.full():
271 return
272 try:
273 work_id = work_ids.get(block=False)
274 except queue.Empty:
275 return
276 else:
277 work_item = pending_work_items[work_id]
278
279 if work_item.future.set_running_or_notify_cancel():
280 call_queue.put(_CallItem(work_id,
281 work_item.fn,
282 work_item.args,
283 work_item.kwargs),
284 block=True)
285 else:
286 del pending_work_items[work_id]
287 continue
288
Thomas Moreau94459fd2018-01-05 11:15:54 +0100289
Antoine Pitroub87a56a2011-05-03 16:34:42 +0200290def _queue_management_worker(executor_reference,
291 processes,
292 pending_work_items,
293 work_ids_queue,
294 call_queue,
Thomas Moreau94459fd2018-01-05 11:15:54 +0100295 result_queue,
296 thread_wakeup):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000297 """Manages the communication between this process and the worker processes.
298
299 This function is run in a local thread.
300
301 Args:
302 executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
303 this thread. Used to determine if the ProcessPoolExecutor has been
304 garbage collected and that this function can exit.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200305 process: A list of the ctx.Process instances used as
Brian Quinlan81c4d362010-09-18 22:35:02 +0000306 workers.
307 pending_work_items: A dict mapping work ids to _WorkItems e.g.
308 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
309 work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200310 call_queue: A ctx.Queue that will be filled with _CallItems
Brian Quinlan81c4d362010-09-18 22:35:02 +0000311 derived from _WorkItems for processing by the process workers.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200312 result_queue: A ctx.SimpleQueue of _ResultItems generated by the
Brian Quinlan81c4d362010-09-18 22:35:02 +0000313 process workers.
Thomas Moreau94459fd2018-01-05 11:15:54 +0100314 thread_wakeup: A _ThreadWakeup to allow waking up the
315 queue_manager_thread from the main Thread and avoid deadlocks
316 caused by permanently locked queues.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000317 """
Antoine Pitrou020436b2011-07-02 21:20:25 +0200318 executor = None
319
320 def shutting_down():
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200321 return (_global_shutdown or executor is None
322 or executor._shutdown_thread)
Antoine Pitroudd696492011-06-08 17:21:55 +0200323
324 def shutdown_worker():
Thomas Moreau94459fd2018-01-05 11:15:54 +0100325 # This is an upper bound on the number of children alive.
326 n_children_alive = sum(p.is_alive() for p in processes.values())
327 n_children_to_stop = n_children_alive
328 n_sentinels_sent = 0
329 # Send the right number of sentinels, to make sure all children are
330 # properly terminated.
331 while n_sentinels_sent < n_children_to_stop and n_children_alive > 0:
332 for i in range(n_children_to_stop - n_sentinels_sent):
333 try:
334 call_queue.put_nowait(None)
335 n_sentinels_sent += 1
336 except Full:
337 break
338 n_children_alive = sum(p.is_alive() for p in processes.values())
339
Antoine Pitroudc19c242011-07-16 01:51:58 +0200340 # Release the queue's resources as soon as possible.
341 call_queue.close()
Antoine Pitroudd696492011-06-08 17:21:55 +0200342 # If .join() is not called on the created processes then
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200343 # some ctx.Queue methods may deadlock on Mac OS X.
Antoine Pitroudd696492011-06-08 17:21:55 +0200344 for p in processes.values():
345 p.join()
346
Thomas Moreau94459fd2018-01-05 11:15:54 +0100347 result_reader = result_queue._reader
348 wakeup_reader = thread_wakeup._reader
349 readers = [result_reader, wakeup_reader]
Antoine Pitroubdb1cf12012-03-05 19:28:37 +0100350
Brian Quinlan81c4d362010-09-18 22:35:02 +0000351 while True:
352 _add_call_item_to_queue(pending_work_items,
353 work_ids_queue,
354 call_queue)
355
Thomas Moreau94459fd2018-01-05 11:15:54 +0100356 # Wait for a result to be ready in the result_queue while checking
357 # that all worker processes are still running, or for a wake up
358 # signal send. The wake up signals come either from new tasks being
359 # submitted, from the executor being shutdown/gc-ed, or from the
360 # shutdown of the python interpreter.
361 worker_sentinels = [p.sentinel for p in processes.values()]
Brian Quinlanf7bda5c2019-05-07 13:31:11 -0400362 ready = mp.connection.wait(readers + worker_sentinels)
Thomas Moreau94459fd2018-01-05 11:15:54 +0100363
364 cause = None
365 is_broken = True
366 if result_reader in ready:
367 try:
368 result_item = result_reader.recv()
369 is_broken = False
370 except BaseException as e:
371 cause = traceback.format_exception(type(e), e, e.__traceback__)
372
373 elif wakeup_reader in ready:
374 is_broken = False
375 result_item = None
376 thread_wakeup.clear()
377 if is_broken:
Antoine Pitroudd696492011-06-08 17:21:55 +0200378 # Mark the process pool broken so that submits fail right now.
379 executor = executor_reference()
380 if executor is not None:
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100381 executor._broken = ('A child process terminated '
382 'abruptly, the process pool is not '
383 'usable anymore')
Antoine Pitroudd696492011-06-08 17:21:55 +0200384 executor._shutdown_thread = True
Antoine Pitrou020436b2011-07-02 21:20:25 +0200385 executor = None
Thomas Moreau94459fd2018-01-05 11:15:54 +0100386 bpe = BrokenProcessPool("A process in the process pool was "
387 "terminated abruptly while the future was "
388 "running or pending.")
389 if cause is not None:
390 bpe.__cause__ = _RemoteTraceback(
391 f"\n'''\n{''.join(cause)}'''")
Antoine Pitroudd696492011-06-08 17:21:55 +0200392 # All futures in flight must be marked failed
393 for work_id, work_item in pending_work_items.items():
Thomas Moreau94459fd2018-01-05 11:15:54 +0100394 work_item.future.set_exception(bpe)
Andrew Svetlov6b973742012-11-03 15:36:01 +0200395 # Delete references to object. See issue16284
396 del work_item
Antoine Pitroudd696492011-06-08 17:21:55 +0200397 pending_work_items.clear()
398 # Terminate remaining workers forcibly: the queues or their
399 # locks may be in a dirty state and block forever.
400 for p in processes.values():
401 p.terminate()
Antoine Pitroudc19c242011-07-16 01:51:58 +0200402 shutdown_worker()
Antoine Pitroudd696492011-06-08 17:21:55 +0200403 return
404 if isinstance(result_item, int):
405 # Clean shutdown of a worker using its PID
406 # (avoids marking the executor broken)
Antoine Pitrou020436b2011-07-02 21:20:25 +0200407 assert shutting_down()
Antoine Pitroud06a0652011-07-16 01:13:34 +0200408 p = processes.pop(result_item)
409 p.join()
Antoine Pitrou020436b2011-07-02 21:20:25 +0200410 if not processes:
411 shutdown_worker()
412 return
Antoine Pitroudd696492011-06-08 17:21:55 +0200413 elif result_item is not None:
414 work_item = pending_work_items.pop(result_item.work_id, None)
415 # work_item can be None if another process terminated (see above)
416 if work_item is not None:
417 if result_item.exception:
418 work_item.future.set_exception(result_item.exception)
419 else:
420 work_item.future.set_result(result_item.result)
Andrew Svetlov6b973742012-11-03 15:36:01 +0200421 # Delete references to object. See issue16284
422 del work_item
Thomas Moreau94459fd2018-01-05 11:15:54 +0100423 # Delete reference to result_item
424 del result_item
425
Antoine Pitroudd696492011-06-08 17:21:55 +0200426 # Check whether we should start shutting down.
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100427 executor = executor_reference()
428 # No more work items can be added if:
429 # - The interpreter is shutting down OR
430 # - The executor that owns this worker has been collected OR
431 # - The executor that owns this worker has been shutdown.
Antoine Pitrou020436b2011-07-02 21:20:25 +0200432 if shutting_down():
Antoine Pitrou020436b2011-07-02 21:20:25 +0200433 try:
Mark Nemecc4b695f2018-04-10 18:23:14 +0100434 # Flag the executor as shutting down as early as possible if it
435 # is not gc-ed yet.
436 if executor is not None:
437 executor._shutdown_thread = True
Kyle Stanley339fd462020-02-02 07:49:00 -0500438 # Unless there are pending work items, we have nothing to cancel.
439 if pending_work_items and executor._cancel_pending_futures:
440 # Cancel all pending futures and update pending_work_items
441 # to only have futures that are currently running.
442 new_pending_work_items = {}
443 for work_id, work_item in pending_work_items.items():
444 if not work_item.future.cancel():
445 new_pending_work_items[work_id] = work_item
446
447 pending_work_items = new_pending_work_items
448 # Drain work_ids_queue since we no longer need to
449 # add items to the call queue.
450 while True:
451 try:
452 work_ids_queue.get_nowait()
453 except queue.Empty:
454 break
455
Antoine Pitrou1c405b32011-07-03 13:17:06 +0200456 # Since no new work items can be added, it is safe to shutdown
457 # this thread if there are no pending work items.
458 if not pending_work_items:
459 shutdown_worker()
460 return
Antoine Pitrou020436b2011-07-02 21:20:25 +0200461 except Full:
462 # This is not a problem: we will eventually be woken up (in
Antoine Pitrou1c405b32011-07-03 13:17:06 +0200463 # result_queue.get()) and be able to send a sentinel again.
Antoine Pitrou020436b2011-07-02 21:20:25 +0200464 pass
465 executor = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000466
Thomas Moreau94459fd2018-01-05 11:15:54 +0100467
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000468_system_limits_checked = False
469_system_limited = None
Thomas Moreau94459fd2018-01-05 11:15:54 +0100470
471
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000472def _check_system_limits():
473 global _system_limits_checked, _system_limited
474 if _system_limits_checked:
475 if _system_limited:
476 raise NotImplementedError(_system_limited)
477 _system_limits_checked = True
478 try:
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000479 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
480 except (AttributeError, ValueError):
481 # sysconf not available or setting not available
482 return
483 if nsems_max == -1:
Ezio Melottib5bc3532013-08-17 16:11:40 +0300484 # indetermined limit, assume that limit is determined
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000485 # by available memory only
486 return
487 if nsems_max >= 256:
488 # minimum number of semaphores available
489 # according to POSIX
490 return
Thomas Moreau94459fd2018-01-05 11:15:54 +0100491 _system_limited = ("system provides too few semaphores (%d"
492 " available, 256 necessary)" % nsems_max)
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000493 raise NotImplementedError(_system_limited)
494
Antoine Pitroudd696492011-06-08 17:21:55 +0200495
Grzegorz Grzywacz97e1b1c2017-09-01 18:54:00 +0200496def _chain_from_iterable_of_lists(iterable):
497 """
498 Specialized implementation of itertools.chain.from_iterable.
499 Each item in *iterable* should be a list. This function is
500 careful not to keep references to yielded objects.
501 """
502 for element in iterable:
503 element.reverse()
504 while element:
505 yield element.pop()
506
507
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100508class BrokenProcessPool(_base.BrokenExecutor):
Antoine Pitroudd696492011-06-08 17:21:55 +0200509 """
510 Raised when a process in a ProcessPoolExecutor terminated abruptly
511 while a future was in the running state.
512 """
513
514
Brian Quinlan81c4d362010-09-18 22:35:02 +0000515class ProcessPoolExecutor(_base.Executor):
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100516 def __init__(self, max_workers=None, mp_context=None,
517 initializer=None, initargs=()):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000518 """Initializes a new ProcessPoolExecutor instance.
519
520 Args:
521 max_workers: The maximum number of processes that can be used to
522 execute the given calls. If None or not given then as many
523 worker processes will be created as the machine has processors.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200524 mp_context: A multiprocessing context to launch the workers. This
525 object should provide SimpleQueue, Queue and Process.
ubordignon552ace72019-06-15 13:43:10 +0200526 initializer: A callable used to initialize worker processes.
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100527 initargs: A tuple of arguments to pass to the initializer.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000528 """
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000529 _check_system_limits()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000530
531 if max_workers is None:
Charles-François Natali37cfb0a2013-06-28 19:25:45 +0200532 self._max_workers = os.cpu_count() or 1
Brian Quinlan39889862019-05-08 14:04:53 -0400533 if sys.platform == 'win32':
534 self._max_workers = min(_MAX_WINDOWS_WORKERS,
535 self._max_workers)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000536 else:
Brian Quinlan20efceb2014-05-17 13:51:10 -0700537 if max_workers <= 0:
538 raise ValueError("max_workers must be greater than 0")
Brian Quinlan39889862019-05-08 14:04:53 -0400539 elif (sys.platform == 'win32' and
540 max_workers > _MAX_WINDOWS_WORKERS):
541 raise ValueError(
542 f"max_workers must be <= {_MAX_WINDOWS_WORKERS}")
Brian Quinlan20efceb2014-05-17 13:51:10 -0700543
Brian Quinlan81c4d362010-09-18 22:35:02 +0000544 self._max_workers = max_workers
Thomas Moreau94459fd2018-01-05 11:15:54 +0100545
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200546 if mp_context is None:
547 mp_context = mp.get_context()
548 self._mp_context = mp_context
Brian Quinlan81c4d362010-09-18 22:35:02 +0000549
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100550 if initializer is not None and not callable(initializer):
551 raise TypeError("initializer must be a callable")
552 self._initializer = initializer
553 self._initargs = initargs
554
Thomas Moreau94459fd2018-01-05 11:15:54 +0100555 # Management thread
Brian Quinlan81c4d362010-09-18 22:35:02 +0000556 self._queue_management_thread = None
Thomas Moreau94459fd2018-01-05 11:15:54 +0100557
Antoine Pitroudd696492011-06-08 17:21:55 +0200558 # Map of pids to processes
559 self._processes = {}
Brian Quinlan81c4d362010-09-18 22:35:02 +0000560
561 # Shutdown is a two-step process.
562 self._shutdown_thread = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000563 self._shutdown_lock = threading.Lock()
Antoine Pitroudd696492011-06-08 17:21:55 +0200564 self._broken = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000565 self._queue_count = 0
566 self._pending_work_items = {}
Kyle Stanley339fd462020-02-02 07:49:00 -0500567 self._cancel_pending_futures = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000568
Thomas Moreau94459fd2018-01-05 11:15:54 +0100569 # Create communication channels for the executor
570 # Make the call queue slightly larger than the number of processes to
571 # prevent the worker processes from idling. But don't make it too big
572 # because futures in the call queue cannot be cancelled.
573 queue_size = self._max_workers + EXTRA_QUEUED_CALLS
574 self._call_queue = _SafeQueue(
575 max_size=queue_size, ctx=self._mp_context,
576 pending_work_items=self._pending_work_items)
577 # Killed worker processes can produce spurious "broken pipe"
578 # tracebacks in the queue's own worker thread. But we detect killed
579 # processes anyway, so silence the tracebacks.
580 self._call_queue._ignore_epipe = True
581 self._result_queue = mp_context.SimpleQueue()
582 self._work_ids = queue.Queue()
583
584 # _ThreadWakeup is a communication channel used to interrupt the wait
585 # of the main loop of queue_manager_thread from another thread (e.g.
586 # when calling executor.submit or executor.shutdown). We do not use the
587 # _result_queue to send the wakeup signal to the queue_manager_thread
588 # as it could result in a deadlock if a worker process dies with the
589 # _result_queue write lock still acquired.
590 self._queue_management_thread_wakeup = _ThreadWakeup()
591
Brian Quinlan81c4d362010-09-18 22:35:02 +0000592 def _start_queue_management_thread(self):
593 if self._queue_management_thread is None:
Thomas Moreau94459fd2018-01-05 11:15:54 +0100594 # When the executor gets garbarge collected, the weakref callback
595 # will wake up the queue management thread so that it can terminate
596 # if there is no pending work item.
597 def weakref_cb(_,
598 thread_wakeup=self._queue_management_thread_wakeup):
599 mp.util.debug('Executor collected: triggering callback for'
600 ' QueueManager wakeup')
601 thread_wakeup.wakeup()
Antoine Pitroudd696492011-06-08 17:21:55 +0200602 # Start the processes so that their sentinels are known.
603 self._adjust_process_count()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000604 self._queue_management_thread = threading.Thread(
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200605 target=_queue_management_worker,
606 args=(weakref.ref(self, weakref_cb),
607 self._processes,
608 self._pending_work_items,
609 self._work_ids,
610 self._call_queue,
Thomas Moreau94459fd2018-01-05 11:15:54 +0100611 self._result_queue,
612 self._queue_management_thread_wakeup),
613 name="QueueManagerThread")
Brian Quinlan81c4d362010-09-18 22:35:02 +0000614 self._queue_management_thread.daemon = True
615 self._queue_management_thread.start()
Thomas Moreau94459fd2018-01-05 11:15:54 +0100616 _threads_wakeups[self._queue_management_thread] = \
617 self._queue_management_thread_wakeup
Brian Quinlan81c4d362010-09-18 22:35:02 +0000618
619 def _adjust_process_count(self):
620 for _ in range(len(self._processes), self._max_workers):
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200621 p = self._mp_context.Process(
622 target=_process_worker,
623 args=(self._call_queue,
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100624 self._result_queue,
625 self._initializer,
626 self._initargs))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000627 p.start()
Antoine Pitroudd696492011-06-08 17:21:55 +0200628 self._processes[p.pid] = p
Brian Quinlan81c4d362010-09-18 22:35:02 +0000629
Serhiy Storchaka142566c2019-06-05 18:22:31 +0300630 def submit(self, fn, /, *args, **kwargs):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000631 with self._shutdown_lock:
Antoine Pitroudd696492011-06-08 17:21:55 +0200632 if self._broken:
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100633 raise BrokenProcessPool(self._broken)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000634 if self._shutdown_thread:
635 raise RuntimeError('cannot schedule new futures after shutdown')
Mark Nemecc4b695f2018-04-10 18:23:14 +0100636 if _global_shutdown:
637 raise RuntimeError('cannot schedule new futures after '
638 'interpreter shutdown')
Brian Quinlan81c4d362010-09-18 22:35:02 +0000639
640 f = _base.Future()
641 w = _WorkItem(f, fn, args, kwargs)
642
643 self._pending_work_items[self._queue_count] = w
644 self._work_ids.put(self._queue_count)
645 self._queue_count += 1
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100646 # Wake up queue management thread
Thomas Moreau94459fd2018-01-05 11:15:54 +0100647 self._queue_management_thread_wakeup.wakeup()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000648
649 self._start_queue_management_thread()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000650 return f
651 submit.__doc__ = _base.Executor.submit.__doc__
652
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200653 def map(self, fn, *iterables, timeout=None, chunksize=1):
Martin Panterd2ad5712015-11-02 04:20:33 +0000654 """Returns an iterator equivalent to map(fn, iter).
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200655
656 Args:
657 fn: A callable that will take as many arguments as there are
658 passed iterables.
659 timeout: The maximum number of seconds to wait. If None, then there
660 is no limit on the wait time.
661 chunksize: If greater than one, the iterables will be chopped into
662 chunks of size chunksize and submitted to the process pool.
663 If set to one, the items in the list will be sent one at a time.
664
665 Returns:
666 An iterator equivalent to: map(func, *iterables) but the calls may
667 be evaluated out-of-order.
668
669 Raises:
670 TimeoutError: If the entire result iterator could not be generated
671 before the given timeout.
672 Exception: If fn(*args) raises for any values.
673 """
674 if chunksize < 1:
675 raise ValueError("chunksize must be >= 1.")
676
677 results = super().map(partial(_process_chunk, fn),
678 _get_chunks(*iterables, chunksize=chunksize),
679 timeout=timeout)
Grzegorz Grzywacz97e1b1c2017-09-01 18:54:00 +0200680 return _chain_from_iterable_of_lists(results)
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200681
Kyle Stanley339fd462020-02-02 07:49:00 -0500682 def shutdown(self, wait=True, *, cancel_futures=False):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000683 with self._shutdown_lock:
Kyle Stanley339fd462020-02-02 07:49:00 -0500684 self._cancel_pending_futures = cancel_futures
Brian Quinlan81c4d362010-09-18 22:35:02 +0000685 self._shutdown_thread = True
Kyle Stanley339fd462020-02-02 07:49:00 -0500686
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100687 if self._queue_management_thread:
688 # Wake up queue management thread
Thomas Moreau94459fd2018-01-05 11:15:54 +0100689 self._queue_management_thread_wakeup.wakeup()
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100690 if wait:
Brian Quinlan81c4d362010-09-18 22:35:02 +0000691 self._queue_management_thread.join()
Ezio Melottib5bc3532013-08-17 16:11:40 +0300692 # To reduce the risk of opening too many files, remove references to
Brian Quinlan81c4d362010-09-18 22:35:02 +0000693 # objects that use file descriptors.
694 self._queue_management_thread = None
Victor Stinnerb713adf2017-09-02 00:25:11 +0200695 if self._call_queue is not None:
696 self._call_queue.close()
697 if wait:
698 self._call_queue.join_thread()
699 self._call_queue = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000700 self._result_queue = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000701 self._processes = None
Thomas Moreau095ee412018-03-12 18:18:41 +0100702
703 if self._queue_management_thread_wakeup:
704 self._queue_management_thread_wakeup.close()
705 self._queue_management_thread_wakeup = None
706
Brian Quinlan81c4d362010-09-18 22:35:02 +0000707 shutdown.__doc__ = _base.Executor.shutdown.__doc__
708
709atexit.register(_python_exit)