blob: d77322831a6c6a58846a2624e9ed89ff2923f5a9 [file] [log] [blame]
Brian Quinlan81c4d362010-09-18 22:35:02 +00001# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4"""Implements ProcessPoolExecutor.
5
Thomas Graingerf938d8b2019-04-12 17:17:17 +01006The following diagram and text describe the data-flow through the system:
Brian Quinlan81c4d362010-09-18 22:35:02 +00007
8|======================= In-process =====================|== Out-of-process ==|
9
10+----------+ +----------+ +--------+ +-----------+ +---------+
Thomas Moreau94459fd2018-01-05 11:15:54 +010011| | => | Work Ids | | | | Call Q | | Process |
12| | +----------+ | | +-----------+ | Pool |
13| | | ... | | | | ... | +---------+
14| | | 6 | => | | => | 5, call() | => | |
Brian Quinlan81c4d362010-09-18 22:35:02 +000015| | | 7 | | | | ... | | |
16| Process | | ... | | Local | +-----------+ | Process |
17| Pool | +----------+ | Worker | | #1..n |
18| Executor | | Thread | | |
19| | +----------- + | | +-----------+ | |
20| | <=> | Work Items | <=> | | <= | Result Q | <= | |
21| | +------------+ | | +-----------+ | |
22| | | 6: call() | | | | ... | | |
23| | | future | | | | 4, result | | |
24| | | ... | | | | 3, except | | |
25+----------+ +------------+ +--------+ +-----------+ +---------+
26
27Executor.submit() called:
28- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29- adds the id of the _WorkItem to the "Work Ids" queue
30
31Local worker thread:
32- reads work ids from the "Work Ids" queue and looks up the corresponding
33 WorkItem from the "Work Items" dict: if the work item has been cancelled then
34 it is simply removed from the dict, otherwise it is repackaged as a
35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38- reads _ResultItems from "Result Q", updates the future stored in the
39 "Work Items" dict and deletes the dict entry
40
41Process #1..n:
42- reads _CallItems from "Call Q", executes the calls, and puts the resulting
Mark Dickinson5ee24042012-10-20 13:16:49 +010043 _ResultItems in "Result Q"
Brian Quinlan81c4d362010-09-18 22:35:02 +000044"""
45
46__author__ = 'Brian Quinlan (brian@sweetapp.com)'
47
48import atexit
Antoine Pitroudd696492011-06-08 17:21:55 +020049import os
Brian Quinlan81c4d362010-09-18 22:35:02 +000050from concurrent.futures import _base
51import queue
Richard Oudkerk1f2eaa92013-10-16 17:06:22 +010052from queue import Full
Thomas Moreaue8c368d2017-10-03 11:53:17 +020053import multiprocessing as mp
Brian Quinlanf7bda5c2019-05-07 13:31:11 -040054import multiprocessing.connection
Thomas Moreau94459fd2018-01-05 11:15:54 +010055from multiprocessing.queues import Queue
Brian Quinlan81c4d362010-09-18 22:35:02 +000056import threading
57import weakref
Antoine Pitrou4aae2762014-10-04 20:20:10 +020058from functools import partial
59import itertools
Brian Quinlan39889862019-05-08 14:04:53 -040060import sys
Antoine Pitrou1285c9b2015-01-17 20:02:14 +010061import traceback
Brian Quinlan81c4d362010-09-18 22:35:02 +000062
63# Workers are created as daemon threads and processes. This is done to allow the
64# interpreter to exit when there are still idle processes in a
65# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
66# allowing workers to die with the interpreter has two undesirable properties:
Raymond Hettinger15f44ab2016-08-30 10:47:49 -070067# - The workers would still be running during interpreter shutdown,
Brian Quinlan81c4d362010-09-18 22:35:02 +000068# meaning that they would fail in unpredictable ways.
69# - The workers could be killed while evaluating a work item, which could
70# be bad if the callable being evaluated has external side-effects e.g.
71# writing to a file.
72#
73# To work around this problem, an exit handler is installed which tells the
74# workers to exit when their work queues are empty and then waits until the
75# threads/processes finish.
76
Thomas Moreau94459fd2018-01-05 11:15:54 +010077_threads_wakeups = weakref.WeakKeyDictionary()
Thomas Moreaue8c368d2017-10-03 11:53:17 +020078_global_shutdown = False
Brian Quinlan81c4d362010-09-18 22:35:02 +000079
Thomas Moreau94459fd2018-01-05 11:15:54 +010080
81class _ThreadWakeup:
Thomas Moreau94459fd2018-01-05 11:15:54 +010082 def __init__(self):
Thomas Moreaua5cbab52020-02-16 19:09:26 +010083 self._closed = False
Thomas Moreau94459fd2018-01-05 11:15:54 +010084 self._reader, self._writer = mp.Pipe(duplex=False)
85
Thomas Moreau095ee412018-03-12 18:18:41 +010086 def close(self):
Thomas Moreaua5cbab52020-02-16 19:09:26 +010087 if not self._closed:
88 self._closed = True
89 self._writer.close()
90 self._reader.close()
Thomas Moreau095ee412018-03-12 18:18:41 +010091
Thomas Moreau94459fd2018-01-05 11:15:54 +010092 def wakeup(self):
Thomas Moreaua5cbab52020-02-16 19:09:26 +010093 if not self._closed:
94 self._writer.send_bytes(b"")
Thomas Moreau94459fd2018-01-05 11:15:54 +010095
96 def clear(self):
Thomas Moreaua5cbab52020-02-16 19:09:26 +010097 if not self._closed:
98 while self._reader.poll():
99 self._reader.recv_bytes()
Thomas Moreau94459fd2018-01-05 11:15:54 +0100100
101
Brian Quinlan81c4d362010-09-18 22:35:02 +0000102def _python_exit():
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200103 global _global_shutdown
104 _global_shutdown = True
Thomas Moreau94459fd2018-01-05 11:15:54 +0100105 items = list(_threads_wakeups.items())
106 for _, thread_wakeup in items:
107 thread_wakeup.wakeup()
108 for t, _ in items:
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100109 t.join()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000110
111# Controls how many more calls than processes will be queued in the call queue.
112# A smaller number will mean that processes spend more time idle waiting for
113# work while a larger number will make Future.cancel() succeed less frequently
114# (Futures in the call queue cannot be cancelled).
115EXTRA_QUEUED_CALLS = 1
116
Thomas Moreau94459fd2018-01-05 11:15:54 +0100117
Brian Quinlan39889862019-05-08 14:04:53 -0400118# On Windows, WaitForMultipleObjects is used to wait for processes to finish.
119# It can wait on, at most, 63 objects. There is an overhead of two objects:
120# - the result queue reader
121# - the thread wakeup reader
122_MAX_WINDOWS_WORKERS = 63 - 2
123
Antoine Pitrou1285c9b2015-01-17 20:02:14 +0100124# Hack to embed stringification of remote traceback in local traceback
125
126class _RemoteTraceback(Exception):
127 def __init__(self, tb):
128 self.tb = tb
129 def __str__(self):
130 return self.tb
131
132class _ExceptionWithTraceback:
133 def __init__(self, exc, tb):
134 tb = traceback.format_exception(type(exc), exc, tb)
135 tb = ''.join(tb)
136 self.exc = exc
137 self.tb = '\n"""\n%s"""' % tb
138 def __reduce__(self):
139 return _rebuild_exc, (self.exc, self.tb)
140
141def _rebuild_exc(exc, tb):
142 exc.__cause__ = _RemoteTraceback(tb)
143 return exc
144
Brian Quinlan81c4d362010-09-18 22:35:02 +0000145class _WorkItem(object):
146 def __init__(self, future, fn, args, kwargs):
147 self.future = future
148 self.fn = fn
149 self.args = args
150 self.kwargs = kwargs
151
152class _ResultItem(object):
153 def __init__(self, work_id, exception=None, result=None):
154 self.work_id = work_id
155 self.exception = exception
156 self.result = result
157
158class _CallItem(object):
159 def __init__(self, work_id, fn, args, kwargs):
160 self.work_id = work_id
161 self.fn = fn
162 self.args = args
163 self.kwargs = kwargs
164
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100165
Thomas Moreau94459fd2018-01-05 11:15:54 +0100166class _SafeQueue(Queue):
167 """Safe Queue set exception to the future object linked to a job"""
Thomas Moreaua5cbab52020-02-16 19:09:26 +0100168 def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup):
Thomas Moreau94459fd2018-01-05 11:15:54 +0100169 self.pending_work_items = pending_work_items
Thomas Moreaua5cbab52020-02-16 19:09:26 +0100170 self.thread_wakeup = thread_wakeup
Thomas Moreau94459fd2018-01-05 11:15:54 +0100171 super().__init__(max_size, ctx=ctx)
172
173 def _on_queue_feeder_error(self, e, obj):
174 if isinstance(obj, _CallItem):
175 tb = traceback.format_exception(type(e), e, e.__traceback__)
176 e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
177 work_item = self.pending_work_items.pop(obj.work_id, None)
Thomas Moreaua5cbab52020-02-16 19:09:26 +0100178 self.thread_wakeup.wakeup()
Thomas Moreau94459fd2018-01-05 11:15:54 +0100179 # work_item can be None if another process terminated. In this case,
180 # the queue_manager_thread fails all work_items with BrokenProcessPool
181 if work_item is not None:
182 work_item.future.set_exception(e)
183 else:
184 super()._on_queue_feeder_error(e, obj)
185
186
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200187def _get_chunks(*iterables, chunksize):
188 """ Iterates over zip()ed iterables in chunks. """
189 it = zip(*iterables)
190 while True:
191 chunk = tuple(itertools.islice(it, chunksize))
192 if not chunk:
193 return
194 yield chunk
195
196def _process_chunk(fn, chunk):
197 """ Processes a chunk of an iterable passed to map.
198
199 Runs the function passed to map() on a chunk of the
200 iterable passed to map.
201
202 This function is run in a separate process.
203
204 """
205 return [fn(*args) for args in chunk]
206
Thomas Moreau94459fd2018-01-05 11:15:54 +0100207
208def _sendback_result(result_queue, work_id, result=None, exception=None):
209 """Safely send back the given result or exception"""
210 try:
211 result_queue.put(_ResultItem(work_id, result=result,
212 exception=exception))
213 except BaseException as e:
214 exc = _ExceptionWithTraceback(e, e.__traceback__)
215 result_queue.put(_ResultItem(work_id, exception=exc))
216
217
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100218def _process_worker(call_queue, result_queue, initializer, initargs):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000219 """Evaluates calls from call_queue and places the results in result_queue.
220
Georg Brandlfb1720b2010-12-09 18:08:43 +0000221 This worker is run in a separate process.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000222
223 Args:
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200224 call_queue: A ctx.Queue of _CallItems that will be read and
Brian Quinlan81c4d362010-09-18 22:35:02 +0000225 evaluated by the worker.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200226 result_queue: A ctx.Queue of _ResultItems that will written
Brian Quinlan81c4d362010-09-18 22:35:02 +0000227 to by the worker.
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100228 initializer: A callable initializer, or None
229 initargs: A tuple of args for the initializer
Brian Quinlan81c4d362010-09-18 22:35:02 +0000230 """
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100231 if initializer is not None:
232 try:
233 initializer(*initargs)
234 except BaseException:
235 _base.LOGGER.critical('Exception in initializer:', exc_info=True)
236 # The parent will notice that the process stopped and
237 # mark the pool broken
238 return
Brian Quinlan81c4d362010-09-18 22:35:02 +0000239 while True:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200240 call_item = call_queue.get(block=True)
241 if call_item is None:
242 # Wake up queue management thread
Antoine Pitroudd696492011-06-08 17:21:55 +0200243 result_queue.put(os.getpid())
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200244 return
Brian Quinlan81c4d362010-09-18 22:35:02 +0000245 try:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200246 r = call_item.fn(*call_item.args, **call_item.kwargs)
247 except BaseException as e:
Antoine Pitrou1285c9b2015-01-17 20:02:14 +0100248 exc = _ExceptionWithTraceback(e, e.__traceback__)
Thomas Moreau94459fd2018-01-05 11:15:54 +0100249 _sendback_result(result_queue, call_item.work_id, exception=exc)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000250 else:
Thomas Moreau94459fd2018-01-05 11:15:54 +0100251 _sendback_result(result_queue, call_item.work_id, result=r)
Dave Chevell962bdea2019-03-17 09:28:51 +1100252 del r
Brian Quinlan81c4d362010-09-18 22:35:02 +0000253
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200254 # Liberate the resource as soon as possible, to avoid holding onto
255 # open files or shared memory that is not needed anymore
256 del call_item
257
258
Brian Quinlan81c4d362010-09-18 22:35:02 +0000259def _add_call_item_to_queue(pending_work_items,
260 work_ids,
261 call_queue):
262 """Fills call_queue with _WorkItems from pending_work_items.
263
264 This function never blocks.
265
266 Args:
267 pending_work_items: A dict mapping work ids to _WorkItems e.g.
268 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
269 work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
270 are consumed and the corresponding _WorkItems from
271 pending_work_items are transformed into _CallItems and put in
272 call_queue.
273 call_queue: A multiprocessing.Queue that will be filled with _CallItems
274 derived from _WorkItems.
275 """
276 while True:
277 if call_queue.full():
278 return
279 try:
280 work_id = work_ids.get(block=False)
281 except queue.Empty:
282 return
283 else:
284 work_item = pending_work_items[work_id]
285
286 if work_item.future.set_running_or_notify_cancel():
287 call_queue.put(_CallItem(work_id,
288 work_item.fn,
289 work_item.args,
290 work_item.kwargs),
291 block=True)
292 else:
293 del pending_work_items[work_id]
294 continue
295
Thomas Moreau94459fd2018-01-05 11:15:54 +0100296
Antoine Pitroub87a56a2011-05-03 16:34:42 +0200297def _queue_management_worker(executor_reference,
298 processes,
299 pending_work_items,
300 work_ids_queue,
301 call_queue,
Thomas Moreau94459fd2018-01-05 11:15:54 +0100302 result_queue,
303 thread_wakeup):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000304 """Manages the communication between this process and the worker processes.
305
306 This function is run in a local thread.
307
308 Args:
309 executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
310 this thread. Used to determine if the ProcessPoolExecutor has been
311 garbage collected and that this function can exit.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200312 process: A list of the ctx.Process instances used as
Brian Quinlan81c4d362010-09-18 22:35:02 +0000313 workers.
314 pending_work_items: A dict mapping work ids to _WorkItems e.g.
315 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
316 work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200317 call_queue: A ctx.Queue that will be filled with _CallItems
Brian Quinlan81c4d362010-09-18 22:35:02 +0000318 derived from _WorkItems for processing by the process workers.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200319 result_queue: A ctx.SimpleQueue of _ResultItems generated by the
Brian Quinlan81c4d362010-09-18 22:35:02 +0000320 process workers.
Thomas Moreau94459fd2018-01-05 11:15:54 +0100321 thread_wakeup: A _ThreadWakeup to allow waking up the
322 queue_manager_thread from the main Thread and avoid deadlocks
323 caused by permanently locked queues.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000324 """
Antoine Pitrou020436b2011-07-02 21:20:25 +0200325 executor = None
326
327 def shutting_down():
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200328 return (_global_shutdown or executor is None
329 or executor._shutdown_thread)
Antoine Pitroudd696492011-06-08 17:21:55 +0200330
331 def shutdown_worker():
Thomas Moreau94459fd2018-01-05 11:15:54 +0100332 # This is an upper bound on the number of children alive.
333 n_children_alive = sum(p.is_alive() for p in processes.values())
334 n_children_to_stop = n_children_alive
335 n_sentinels_sent = 0
336 # Send the right number of sentinels, to make sure all children are
337 # properly terminated.
338 while n_sentinels_sent < n_children_to_stop and n_children_alive > 0:
339 for i in range(n_children_to_stop - n_sentinels_sent):
340 try:
341 call_queue.put_nowait(None)
342 n_sentinels_sent += 1
343 except Full:
344 break
345 n_children_alive = sum(p.is_alive() for p in processes.values())
346
Antoine Pitroudc19c242011-07-16 01:51:58 +0200347 # Release the queue's resources as soon as possible.
348 call_queue.close()
Thomas Moreaua5cbab52020-02-16 19:09:26 +0100349 call_queue.join_thread()
350 thread_wakeup.close()
Antoine Pitroudd696492011-06-08 17:21:55 +0200351 # If .join() is not called on the created processes then
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200352 # some ctx.Queue methods may deadlock on Mac OS X.
Antoine Pitroudd696492011-06-08 17:21:55 +0200353 for p in processes.values():
354 p.join()
355
Thomas Moreau94459fd2018-01-05 11:15:54 +0100356 result_reader = result_queue._reader
357 wakeup_reader = thread_wakeup._reader
358 readers = [result_reader, wakeup_reader]
Antoine Pitroubdb1cf12012-03-05 19:28:37 +0100359
Brian Quinlan81c4d362010-09-18 22:35:02 +0000360 while True:
361 _add_call_item_to_queue(pending_work_items,
362 work_ids_queue,
363 call_queue)
364
Thomas Moreau94459fd2018-01-05 11:15:54 +0100365 # Wait for a result to be ready in the result_queue while checking
366 # that all worker processes are still running, or for a wake up
367 # signal send. The wake up signals come either from new tasks being
368 # submitted, from the executor being shutdown/gc-ed, or from the
369 # shutdown of the python interpreter.
370 worker_sentinels = [p.sentinel for p in processes.values()]
Brian Quinlanf7bda5c2019-05-07 13:31:11 -0400371 ready = mp.connection.wait(readers + worker_sentinels)
Thomas Moreau94459fd2018-01-05 11:15:54 +0100372
373 cause = None
374 is_broken = True
375 if result_reader in ready:
376 try:
377 result_item = result_reader.recv()
378 is_broken = False
379 except BaseException as e:
380 cause = traceback.format_exception(type(e), e, e.__traceback__)
381
382 elif wakeup_reader in ready:
383 is_broken = False
384 result_item = None
385 thread_wakeup.clear()
386 if is_broken:
Antoine Pitroudd696492011-06-08 17:21:55 +0200387 # Mark the process pool broken so that submits fail right now.
388 executor = executor_reference()
389 if executor is not None:
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100390 executor._broken = ('A child process terminated '
391 'abruptly, the process pool is not '
392 'usable anymore')
Antoine Pitroudd696492011-06-08 17:21:55 +0200393 executor._shutdown_thread = True
Antoine Pitrou020436b2011-07-02 21:20:25 +0200394 executor = None
Thomas Moreau94459fd2018-01-05 11:15:54 +0100395 bpe = BrokenProcessPool("A process in the process pool was "
396 "terminated abruptly while the future was "
397 "running or pending.")
398 if cause is not None:
399 bpe.__cause__ = _RemoteTraceback(
400 f"\n'''\n{''.join(cause)}'''")
Antoine Pitroudd696492011-06-08 17:21:55 +0200401 # All futures in flight must be marked failed
402 for work_id, work_item in pending_work_items.items():
Thomas Moreau94459fd2018-01-05 11:15:54 +0100403 work_item.future.set_exception(bpe)
Andrew Svetlov6b973742012-11-03 15:36:01 +0200404 # Delete references to object. See issue16284
405 del work_item
Antoine Pitroudd696492011-06-08 17:21:55 +0200406 pending_work_items.clear()
407 # Terminate remaining workers forcibly: the queues or their
408 # locks may be in a dirty state and block forever.
409 for p in processes.values():
410 p.terminate()
Antoine Pitroudc19c242011-07-16 01:51:58 +0200411 shutdown_worker()
Antoine Pitroudd696492011-06-08 17:21:55 +0200412 return
413 if isinstance(result_item, int):
414 # Clean shutdown of a worker using its PID
415 # (avoids marking the executor broken)
Antoine Pitrou020436b2011-07-02 21:20:25 +0200416 assert shutting_down()
Antoine Pitroud06a0652011-07-16 01:13:34 +0200417 p = processes.pop(result_item)
418 p.join()
Antoine Pitrou020436b2011-07-02 21:20:25 +0200419 if not processes:
420 shutdown_worker()
421 return
Antoine Pitroudd696492011-06-08 17:21:55 +0200422 elif result_item is not None:
423 work_item = pending_work_items.pop(result_item.work_id, None)
424 # work_item can be None if another process terminated (see above)
425 if work_item is not None:
426 if result_item.exception:
427 work_item.future.set_exception(result_item.exception)
428 else:
429 work_item.future.set_result(result_item.result)
Andrew Svetlov6b973742012-11-03 15:36:01 +0200430 # Delete references to object. See issue16284
431 del work_item
Thomas Moreau94459fd2018-01-05 11:15:54 +0100432 # Delete reference to result_item
433 del result_item
434
Antoine Pitroudd696492011-06-08 17:21:55 +0200435 # Check whether we should start shutting down.
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100436 executor = executor_reference()
437 # No more work items can be added if:
438 # - The interpreter is shutting down OR
439 # - The executor that owns this worker has been collected OR
440 # - The executor that owns this worker has been shutdown.
Antoine Pitrou020436b2011-07-02 21:20:25 +0200441 if shutting_down():
Antoine Pitrou020436b2011-07-02 21:20:25 +0200442 try:
Mark Nemecc4b695f2018-04-10 18:23:14 +0100443 # Flag the executor as shutting down as early as possible if it
444 # is not gc-ed yet.
445 if executor is not None:
446 executor._shutdown_thread = True
Kyle Stanley339fd462020-02-02 07:49:00 -0500447 # Unless there are pending work items, we have nothing to cancel.
448 if pending_work_items and executor._cancel_pending_futures:
449 # Cancel all pending futures and update pending_work_items
450 # to only have futures that are currently running.
451 new_pending_work_items = {}
452 for work_id, work_item in pending_work_items.items():
453 if not work_item.future.cancel():
454 new_pending_work_items[work_id] = work_item
455
456 pending_work_items = new_pending_work_items
457 # Drain work_ids_queue since we no longer need to
458 # add items to the call queue.
459 while True:
460 try:
461 work_ids_queue.get_nowait()
462 except queue.Empty:
463 break
464
Antoine Pitrou1c405b32011-07-03 13:17:06 +0200465 # Since no new work items can be added, it is safe to shutdown
466 # this thread if there are no pending work items.
467 if not pending_work_items:
468 shutdown_worker()
469 return
Antoine Pitrou020436b2011-07-02 21:20:25 +0200470 except Full:
471 # This is not a problem: we will eventually be woken up (in
Antoine Pitrou1c405b32011-07-03 13:17:06 +0200472 # result_queue.get()) and be able to send a sentinel again.
Antoine Pitrou020436b2011-07-02 21:20:25 +0200473 pass
474 executor = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000475
Thomas Moreau94459fd2018-01-05 11:15:54 +0100476
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000477_system_limits_checked = False
478_system_limited = None
Thomas Moreau94459fd2018-01-05 11:15:54 +0100479
480
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000481def _check_system_limits():
482 global _system_limits_checked, _system_limited
483 if _system_limits_checked:
484 if _system_limited:
485 raise NotImplementedError(_system_limited)
486 _system_limits_checked = True
487 try:
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000488 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
489 except (AttributeError, ValueError):
490 # sysconf not available or setting not available
491 return
492 if nsems_max == -1:
Ezio Melottib5bc3532013-08-17 16:11:40 +0300493 # indetermined limit, assume that limit is determined
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000494 # by available memory only
495 return
496 if nsems_max >= 256:
497 # minimum number of semaphores available
498 # according to POSIX
499 return
Thomas Moreau94459fd2018-01-05 11:15:54 +0100500 _system_limited = ("system provides too few semaphores (%d"
501 " available, 256 necessary)" % nsems_max)
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000502 raise NotImplementedError(_system_limited)
503
Antoine Pitroudd696492011-06-08 17:21:55 +0200504
Grzegorz Grzywacz97e1b1c2017-09-01 18:54:00 +0200505def _chain_from_iterable_of_lists(iterable):
506 """
507 Specialized implementation of itertools.chain.from_iterable.
508 Each item in *iterable* should be a list. This function is
509 careful not to keep references to yielded objects.
510 """
511 for element in iterable:
512 element.reverse()
513 while element:
514 yield element.pop()
515
516
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100517class BrokenProcessPool(_base.BrokenExecutor):
Antoine Pitroudd696492011-06-08 17:21:55 +0200518 """
519 Raised when a process in a ProcessPoolExecutor terminated abruptly
520 while a future was in the running state.
521 """
522
523
Brian Quinlan81c4d362010-09-18 22:35:02 +0000524class ProcessPoolExecutor(_base.Executor):
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100525 def __init__(self, max_workers=None, mp_context=None,
526 initializer=None, initargs=()):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000527 """Initializes a new ProcessPoolExecutor instance.
528
529 Args:
530 max_workers: The maximum number of processes that can be used to
531 execute the given calls. If None or not given then as many
532 worker processes will be created as the machine has processors.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200533 mp_context: A multiprocessing context to launch the workers. This
534 object should provide SimpleQueue, Queue and Process.
ubordignon552ace72019-06-15 13:43:10 +0200535 initializer: A callable used to initialize worker processes.
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100536 initargs: A tuple of arguments to pass to the initializer.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000537 """
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000538 _check_system_limits()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000539
540 if max_workers is None:
Charles-François Natali37cfb0a2013-06-28 19:25:45 +0200541 self._max_workers = os.cpu_count() or 1
Brian Quinlan39889862019-05-08 14:04:53 -0400542 if sys.platform == 'win32':
543 self._max_workers = min(_MAX_WINDOWS_WORKERS,
544 self._max_workers)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000545 else:
Brian Quinlan20efceb2014-05-17 13:51:10 -0700546 if max_workers <= 0:
547 raise ValueError("max_workers must be greater than 0")
Brian Quinlan39889862019-05-08 14:04:53 -0400548 elif (sys.platform == 'win32' and
549 max_workers > _MAX_WINDOWS_WORKERS):
550 raise ValueError(
551 f"max_workers must be <= {_MAX_WINDOWS_WORKERS}")
Brian Quinlan20efceb2014-05-17 13:51:10 -0700552
Brian Quinlan81c4d362010-09-18 22:35:02 +0000553 self._max_workers = max_workers
Thomas Moreau94459fd2018-01-05 11:15:54 +0100554
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200555 if mp_context is None:
556 mp_context = mp.get_context()
557 self._mp_context = mp_context
Brian Quinlan81c4d362010-09-18 22:35:02 +0000558
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100559 if initializer is not None and not callable(initializer):
560 raise TypeError("initializer must be a callable")
561 self._initializer = initializer
562 self._initargs = initargs
563
Thomas Moreau94459fd2018-01-05 11:15:54 +0100564 # Management thread
Brian Quinlan81c4d362010-09-18 22:35:02 +0000565 self._queue_management_thread = None
Thomas Moreau94459fd2018-01-05 11:15:54 +0100566
Antoine Pitroudd696492011-06-08 17:21:55 +0200567 # Map of pids to processes
568 self._processes = {}
Brian Quinlan81c4d362010-09-18 22:35:02 +0000569
570 # Shutdown is a two-step process.
571 self._shutdown_thread = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000572 self._shutdown_lock = threading.Lock()
Antoine Pitroudd696492011-06-08 17:21:55 +0200573 self._broken = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000574 self._queue_count = 0
575 self._pending_work_items = {}
Kyle Stanley339fd462020-02-02 07:49:00 -0500576 self._cancel_pending_futures = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000577
Thomas Moreau94459fd2018-01-05 11:15:54 +0100578 # _ThreadWakeup is a communication channel used to interrupt the wait
579 # of the main loop of queue_manager_thread from another thread (e.g.
580 # when calling executor.submit or executor.shutdown). We do not use the
581 # _result_queue to send the wakeup signal to the queue_manager_thread
582 # as it could result in a deadlock if a worker process dies with the
583 # _result_queue write lock still acquired.
584 self._queue_management_thread_wakeup = _ThreadWakeup()
585
Thomas Moreaua5cbab52020-02-16 19:09:26 +0100586 # Create communication channels for the executor
587 # Make the call queue slightly larger than the number of processes to
588 # prevent the worker processes from idling. But don't make it too big
589 # because futures in the call queue cannot be cancelled.
590 queue_size = self._max_workers + EXTRA_QUEUED_CALLS
591 self._call_queue = _SafeQueue(
592 max_size=queue_size, ctx=self._mp_context,
593 pending_work_items=self._pending_work_items,
594 thread_wakeup=self._queue_management_thread_wakeup)
595 # Killed worker processes can produce spurious "broken pipe"
596 # tracebacks in the queue's own worker thread. But we detect killed
597 # processes anyway, so silence the tracebacks.
598 self._call_queue._ignore_epipe = True
599 self._result_queue = mp_context.SimpleQueue()
600 self._work_ids = queue.Queue()
601
Brian Quinlan81c4d362010-09-18 22:35:02 +0000602 def _start_queue_management_thread(self):
603 if self._queue_management_thread is None:
Thomas Moreau94459fd2018-01-05 11:15:54 +0100604 # When the executor gets garbarge collected, the weakref callback
605 # will wake up the queue management thread so that it can terminate
606 # if there is no pending work item.
607 def weakref_cb(_,
608 thread_wakeup=self._queue_management_thread_wakeup):
609 mp.util.debug('Executor collected: triggering callback for'
610 ' QueueManager wakeup')
611 thread_wakeup.wakeup()
Antoine Pitroudd696492011-06-08 17:21:55 +0200612 # Start the processes so that their sentinels are known.
613 self._adjust_process_count()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000614 self._queue_management_thread = threading.Thread(
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200615 target=_queue_management_worker,
616 args=(weakref.ref(self, weakref_cb),
617 self._processes,
618 self._pending_work_items,
619 self._work_ids,
620 self._call_queue,
Thomas Moreau94459fd2018-01-05 11:15:54 +0100621 self._result_queue,
622 self._queue_management_thread_wakeup),
623 name="QueueManagerThread")
Brian Quinlan81c4d362010-09-18 22:35:02 +0000624 self._queue_management_thread.daemon = True
625 self._queue_management_thread.start()
Thomas Moreau94459fd2018-01-05 11:15:54 +0100626 _threads_wakeups[self._queue_management_thread] = \
627 self._queue_management_thread_wakeup
Brian Quinlan81c4d362010-09-18 22:35:02 +0000628
629 def _adjust_process_count(self):
630 for _ in range(len(self._processes), self._max_workers):
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200631 p = self._mp_context.Process(
632 target=_process_worker,
633 args=(self._call_queue,
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100634 self._result_queue,
635 self._initializer,
636 self._initargs))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000637 p.start()
Antoine Pitroudd696492011-06-08 17:21:55 +0200638 self._processes[p.pid] = p
Brian Quinlan81c4d362010-09-18 22:35:02 +0000639
Serhiy Storchaka142566c2019-06-05 18:22:31 +0300640 def submit(self, fn, /, *args, **kwargs):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000641 with self._shutdown_lock:
Antoine Pitroudd696492011-06-08 17:21:55 +0200642 if self._broken:
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100643 raise BrokenProcessPool(self._broken)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000644 if self._shutdown_thread:
645 raise RuntimeError('cannot schedule new futures after shutdown')
Mark Nemecc4b695f2018-04-10 18:23:14 +0100646 if _global_shutdown:
647 raise RuntimeError('cannot schedule new futures after '
648 'interpreter shutdown')
Brian Quinlan81c4d362010-09-18 22:35:02 +0000649
650 f = _base.Future()
651 w = _WorkItem(f, fn, args, kwargs)
652
653 self._pending_work_items[self._queue_count] = w
654 self._work_ids.put(self._queue_count)
655 self._queue_count += 1
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100656 # Wake up queue management thread
Thomas Moreau94459fd2018-01-05 11:15:54 +0100657 self._queue_management_thread_wakeup.wakeup()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000658
659 self._start_queue_management_thread()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000660 return f
661 submit.__doc__ = _base.Executor.submit.__doc__
662
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200663 def map(self, fn, *iterables, timeout=None, chunksize=1):
Martin Panterd2ad5712015-11-02 04:20:33 +0000664 """Returns an iterator equivalent to map(fn, iter).
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200665
666 Args:
667 fn: A callable that will take as many arguments as there are
668 passed iterables.
669 timeout: The maximum number of seconds to wait. If None, then there
670 is no limit on the wait time.
671 chunksize: If greater than one, the iterables will be chopped into
672 chunks of size chunksize and submitted to the process pool.
673 If set to one, the items in the list will be sent one at a time.
674
675 Returns:
676 An iterator equivalent to: map(func, *iterables) but the calls may
677 be evaluated out-of-order.
678
679 Raises:
680 TimeoutError: If the entire result iterator could not be generated
681 before the given timeout.
682 Exception: If fn(*args) raises for any values.
683 """
684 if chunksize < 1:
685 raise ValueError("chunksize must be >= 1.")
686
687 results = super().map(partial(_process_chunk, fn),
688 _get_chunks(*iterables, chunksize=chunksize),
689 timeout=timeout)
Grzegorz Grzywacz97e1b1c2017-09-01 18:54:00 +0200690 return _chain_from_iterable_of_lists(results)
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200691
Kyle Stanley339fd462020-02-02 07:49:00 -0500692 def shutdown(self, wait=True, *, cancel_futures=False):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000693 with self._shutdown_lock:
Kyle Stanley339fd462020-02-02 07:49:00 -0500694 self._cancel_pending_futures = cancel_futures
Brian Quinlan81c4d362010-09-18 22:35:02 +0000695 self._shutdown_thread = True
Kyle Stanley339fd462020-02-02 07:49:00 -0500696
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100697 if self._queue_management_thread:
698 # Wake up queue management thread
Thomas Moreau94459fd2018-01-05 11:15:54 +0100699 self._queue_management_thread_wakeup.wakeup()
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100700 if wait:
Brian Quinlan81c4d362010-09-18 22:35:02 +0000701 self._queue_management_thread.join()
Ezio Melottib5bc3532013-08-17 16:11:40 +0300702 # To reduce the risk of opening too many files, remove references to
Brian Quinlan81c4d362010-09-18 22:35:02 +0000703 # objects that use file descriptors.
704 self._queue_management_thread = None
Thomas Moreaua5cbab52020-02-16 19:09:26 +0100705 self._call_queue = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000706 self._result_queue = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000707 self._processes = None
Thomas Moreau095ee412018-03-12 18:18:41 +0100708
709 if self._queue_management_thread_wakeup:
Thomas Moreau095ee412018-03-12 18:18:41 +0100710 self._queue_management_thread_wakeup = None
711
Brian Quinlan81c4d362010-09-18 22:35:02 +0000712 shutdown.__doc__ = _base.Executor.shutdown.__doc__
713
714atexit.register(_python_exit)