blob: dd14eaec907d7b5a14d8bc5100fe4e2135d5d388 [file] [log] [blame]
Brian Quinlan81c4d362010-09-18 22:35:02 +00001# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4"""Implements ProcessPoolExecutor.
5
Thomas Graingerf938d8b2019-04-12 17:17:17 +01006The following diagram and text describe the data-flow through the system:
Brian Quinlan81c4d362010-09-18 22:35:02 +00007
8|======================= In-process =====================|== Out-of-process ==|
9
10+----------+ +----------+ +--------+ +-----------+ +---------+
Thomas Moreau94459fd2018-01-05 11:15:54 +010011| | => | Work Ids | | | | Call Q | | Process |
12| | +----------+ | | +-----------+ | Pool |
13| | | ... | | | | ... | +---------+
14| | | 6 | => | | => | 5, call() | => | |
Brian Quinlan81c4d362010-09-18 22:35:02 +000015| | | 7 | | | | ... | | |
16| Process | | ... | | Local | +-----------+ | Process |
17| Pool | +----------+ | Worker | | #1..n |
18| Executor | | Thread | | |
19| | +----------- + | | +-----------+ | |
20| | <=> | Work Items | <=> | | <= | Result Q | <= | |
21| | +------------+ | | +-----------+ | |
22| | | 6: call() | | | | ... | | |
23| | | future | | | | 4, result | | |
24| | | ... | | | | 3, except | | |
25+----------+ +------------+ +--------+ +-----------+ +---------+
26
27Executor.submit() called:
28- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29- adds the id of the _WorkItem to the "Work Ids" queue
30
31Local worker thread:
32- reads work ids from the "Work Ids" queue and looks up the corresponding
33 WorkItem from the "Work Items" dict: if the work item has been cancelled then
34 it is simply removed from the dict, otherwise it is repackaged as a
35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38- reads _ResultItems from "Result Q", updates the future stored in the
39 "Work Items" dict and deletes the dict entry
40
41Process #1..n:
42- reads _CallItems from "Call Q", executes the calls, and puts the resulting
Mark Dickinson5ee24042012-10-20 13:16:49 +010043 _ResultItems in "Result Q"
Brian Quinlan81c4d362010-09-18 22:35:02 +000044"""
45
46__author__ = 'Brian Quinlan (brian@sweetapp.com)'
47
48import atexit
Antoine Pitroudd696492011-06-08 17:21:55 +020049import os
Brian Quinlan81c4d362010-09-18 22:35:02 +000050from concurrent.futures import _base
51import queue
Richard Oudkerk1f2eaa92013-10-16 17:06:22 +010052from queue import Full
Thomas Moreaue8c368d2017-10-03 11:53:17 +020053import multiprocessing as mp
Brian Quinlanf7bda5c2019-05-07 13:31:11 -040054import multiprocessing.connection
Thomas Moreau94459fd2018-01-05 11:15:54 +010055from multiprocessing.queues import Queue
Brian Quinlan81c4d362010-09-18 22:35:02 +000056import threading
57import weakref
Antoine Pitrou4aae2762014-10-04 20:20:10 +020058from functools import partial
59import itertools
Brian Quinlan39889862019-05-08 14:04:53 -040060import sys
Antoine Pitrou1285c9b2015-01-17 20:02:14 +010061import traceback
Brian Quinlan81c4d362010-09-18 22:35:02 +000062
63# Workers are created as daemon threads and processes. This is done to allow the
64# interpreter to exit when there are still idle processes in a
65# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
66# allowing workers to die with the interpreter has two undesirable properties:
Raymond Hettinger15f44ab2016-08-30 10:47:49 -070067# - The workers would still be running during interpreter shutdown,
Brian Quinlan81c4d362010-09-18 22:35:02 +000068# meaning that they would fail in unpredictable ways.
69# - The workers could be killed while evaluating a work item, which could
70# be bad if the callable being evaluated has external side-effects e.g.
71# writing to a file.
72#
73# To work around this problem, an exit handler is installed which tells the
74# workers to exit when their work queues are empty and then waits until the
75# threads/processes finish.
76
Thomas Moreau94459fd2018-01-05 11:15:54 +010077_threads_wakeups = weakref.WeakKeyDictionary()
Thomas Moreaue8c368d2017-10-03 11:53:17 +020078_global_shutdown = False
Brian Quinlan81c4d362010-09-18 22:35:02 +000079
Thomas Moreau94459fd2018-01-05 11:15:54 +010080
81class _ThreadWakeup:
Thomas Moreau94459fd2018-01-05 11:15:54 +010082 def __init__(self):
83 self._reader, self._writer = mp.Pipe(duplex=False)
84
Thomas Moreau095ee412018-03-12 18:18:41 +010085 def close(self):
86 self._writer.close()
87 self._reader.close()
88
Thomas Moreau94459fd2018-01-05 11:15:54 +010089 def wakeup(self):
90 self._writer.send_bytes(b"")
91
92 def clear(self):
93 while self._reader.poll():
94 self._reader.recv_bytes()
95
96
Brian Quinlan81c4d362010-09-18 22:35:02 +000097def _python_exit():
Thomas Moreaue8c368d2017-10-03 11:53:17 +020098 global _global_shutdown
99 _global_shutdown = True
Thomas Moreau94459fd2018-01-05 11:15:54 +0100100 items = list(_threads_wakeups.items())
101 for _, thread_wakeup in items:
102 thread_wakeup.wakeup()
103 for t, _ in items:
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100104 t.join()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000105
106# Controls how many more calls than processes will be queued in the call queue.
107# A smaller number will mean that processes spend more time idle waiting for
108# work while a larger number will make Future.cancel() succeed less frequently
109# (Futures in the call queue cannot be cancelled).
110EXTRA_QUEUED_CALLS = 1
111
Thomas Moreau94459fd2018-01-05 11:15:54 +0100112
Brian Quinlan39889862019-05-08 14:04:53 -0400113# On Windows, WaitForMultipleObjects is used to wait for processes to finish.
114# It can wait on, at most, 63 objects. There is an overhead of two objects:
115# - the result queue reader
116# - the thread wakeup reader
117_MAX_WINDOWS_WORKERS = 63 - 2
118
Antoine Pitrou1285c9b2015-01-17 20:02:14 +0100119# Hack to embed stringification of remote traceback in local traceback
120
121class _RemoteTraceback(Exception):
122 def __init__(self, tb):
123 self.tb = tb
124 def __str__(self):
125 return self.tb
126
127class _ExceptionWithTraceback:
128 def __init__(self, exc, tb):
129 tb = traceback.format_exception(type(exc), exc, tb)
130 tb = ''.join(tb)
131 self.exc = exc
132 self.tb = '\n"""\n%s"""' % tb
133 def __reduce__(self):
134 return _rebuild_exc, (self.exc, self.tb)
135
136def _rebuild_exc(exc, tb):
137 exc.__cause__ = _RemoteTraceback(tb)
138 return exc
139
Brian Quinlan81c4d362010-09-18 22:35:02 +0000140class _WorkItem(object):
141 def __init__(self, future, fn, args, kwargs):
142 self.future = future
143 self.fn = fn
144 self.args = args
145 self.kwargs = kwargs
146
147class _ResultItem(object):
148 def __init__(self, work_id, exception=None, result=None):
149 self.work_id = work_id
150 self.exception = exception
151 self.result = result
152
153class _CallItem(object):
154 def __init__(self, work_id, fn, args, kwargs):
155 self.work_id = work_id
156 self.fn = fn
157 self.args = args
158 self.kwargs = kwargs
159
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100160
Thomas Moreau94459fd2018-01-05 11:15:54 +0100161class _SafeQueue(Queue):
162 """Safe Queue set exception to the future object linked to a job"""
163 def __init__(self, max_size=0, *, ctx, pending_work_items):
164 self.pending_work_items = pending_work_items
165 super().__init__(max_size, ctx=ctx)
166
167 def _on_queue_feeder_error(self, e, obj):
168 if isinstance(obj, _CallItem):
169 tb = traceback.format_exception(type(e), e, e.__traceback__)
170 e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
171 work_item = self.pending_work_items.pop(obj.work_id, None)
172 # work_item can be None if another process terminated. In this case,
173 # the queue_manager_thread fails all work_items with BrokenProcessPool
174 if work_item is not None:
175 work_item.future.set_exception(e)
176 else:
177 super()._on_queue_feeder_error(e, obj)
178
179
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200180def _get_chunks(*iterables, chunksize):
181 """ Iterates over zip()ed iterables in chunks. """
182 it = zip(*iterables)
183 while True:
184 chunk = tuple(itertools.islice(it, chunksize))
185 if not chunk:
186 return
187 yield chunk
188
189def _process_chunk(fn, chunk):
190 """ Processes a chunk of an iterable passed to map.
191
192 Runs the function passed to map() on a chunk of the
193 iterable passed to map.
194
195 This function is run in a separate process.
196
197 """
198 return [fn(*args) for args in chunk]
199
Thomas Moreau94459fd2018-01-05 11:15:54 +0100200
201def _sendback_result(result_queue, work_id, result=None, exception=None):
202 """Safely send back the given result or exception"""
203 try:
204 result_queue.put(_ResultItem(work_id, result=result,
205 exception=exception))
206 except BaseException as e:
207 exc = _ExceptionWithTraceback(e, e.__traceback__)
208 result_queue.put(_ResultItem(work_id, exception=exc))
209
210
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100211def _process_worker(call_queue, result_queue, initializer, initargs):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000212 """Evaluates calls from call_queue and places the results in result_queue.
213
Georg Brandlfb1720b2010-12-09 18:08:43 +0000214 This worker is run in a separate process.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000215
216 Args:
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200217 call_queue: A ctx.Queue of _CallItems that will be read and
Brian Quinlan81c4d362010-09-18 22:35:02 +0000218 evaluated by the worker.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200219 result_queue: A ctx.Queue of _ResultItems that will written
Brian Quinlan81c4d362010-09-18 22:35:02 +0000220 to by the worker.
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100221 initializer: A callable initializer, or None
222 initargs: A tuple of args for the initializer
Brian Quinlan81c4d362010-09-18 22:35:02 +0000223 """
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100224 if initializer is not None:
225 try:
226 initializer(*initargs)
227 except BaseException:
228 _base.LOGGER.critical('Exception in initializer:', exc_info=True)
229 # The parent will notice that the process stopped and
230 # mark the pool broken
231 return
Brian Quinlan81c4d362010-09-18 22:35:02 +0000232 while True:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200233 call_item = call_queue.get(block=True)
234 if call_item is None:
235 # Wake up queue management thread
Antoine Pitroudd696492011-06-08 17:21:55 +0200236 result_queue.put(os.getpid())
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200237 return
Brian Quinlan81c4d362010-09-18 22:35:02 +0000238 try:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200239 r = call_item.fn(*call_item.args, **call_item.kwargs)
240 except BaseException as e:
Antoine Pitrou1285c9b2015-01-17 20:02:14 +0100241 exc = _ExceptionWithTraceback(e, e.__traceback__)
Thomas Moreau94459fd2018-01-05 11:15:54 +0100242 _sendback_result(result_queue, call_item.work_id, exception=exc)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000243 else:
Thomas Moreau94459fd2018-01-05 11:15:54 +0100244 _sendback_result(result_queue, call_item.work_id, result=r)
Dave Chevell962bdea2019-03-17 09:28:51 +1100245 del r
Brian Quinlan81c4d362010-09-18 22:35:02 +0000246
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200247 # Liberate the resource as soon as possible, to avoid holding onto
248 # open files or shared memory that is not needed anymore
249 del call_item
250
251
Brian Quinlan81c4d362010-09-18 22:35:02 +0000252def _add_call_item_to_queue(pending_work_items,
253 work_ids,
254 call_queue):
255 """Fills call_queue with _WorkItems from pending_work_items.
256
257 This function never blocks.
258
259 Args:
260 pending_work_items: A dict mapping work ids to _WorkItems e.g.
261 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
262 work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
263 are consumed and the corresponding _WorkItems from
264 pending_work_items are transformed into _CallItems and put in
265 call_queue.
266 call_queue: A multiprocessing.Queue that will be filled with _CallItems
267 derived from _WorkItems.
268 """
269 while True:
270 if call_queue.full():
271 return
272 try:
273 work_id = work_ids.get(block=False)
274 except queue.Empty:
275 return
276 else:
277 work_item = pending_work_items[work_id]
278
279 if work_item.future.set_running_or_notify_cancel():
280 call_queue.put(_CallItem(work_id,
281 work_item.fn,
282 work_item.args,
283 work_item.kwargs),
284 block=True)
285 else:
286 del pending_work_items[work_id]
287 continue
288
Thomas Moreau94459fd2018-01-05 11:15:54 +0100289
Antoine Pitroub87a56a2011-05-03 16:34:42 +0200290def _queue_management_worker(executor_reference,
291 processes,
292 pending_work_items,
293 work_ids_queue,
294 call_queue,
Thomas Moreau94459fd2018-01-05 11:15:54 +0100295 result_queue,
296 thread_wakeup):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000297 """Manages the communication between this process and the worker processes.
298
299 This function is run in a local thread.
300
301 Args:
302 executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
303 this thread. Used to determine if the ProcessPoolExecutor has been
304 garbage collected and that this function can exit.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200305 process: A list of the ctx.Process instances used as
Brian Quinlan81c4d362010-09-18 22:35:02 +0000306 workers.
307 pending_work_items: A dict mapping work ids to _WorkItems e.g.
308 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
309 work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200310 call_queue: A ctx.Queue that will be filled with _CallItems
Brian Quinlan81c4d362010-09-18 22:35:02 +0000311 derived from _WorkItems for processing by the process workers.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200312 result_queue: A ctx.SimpleQueue of _ResultItems generated by the
Brian Quinlan81c4d362010-09-18 22:35:02 +0000313 process workers.
Thomas Moreau94459fd2018-01-05 11:15:54 +0100314 thread_wakeup: A _ThreadWakeup to allow waking up the
315 queue_manager_thread from the main Thread and avoid deadlocks
316 caused by permanently locked queues.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000317 """
Antoine Pitrou020436b2011-07-02 21:20:25 +0200318 executor = None
319
320 def shutting_down():
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200321 return (_global_shutdown or executor is None
322 or executor._shutdown_thread)
Antoine Pitroudd696492011-06-08 17:21:55 +0200323
324 def shutdown_worker():
Thomas Moreau94459fd2018-01-05 11:15:54 +0100325 # This is an upper bound on the number of children alive.
326 n_children_alive = sum(p.is_alive() for p in processes.values())
327 n_children_to_stop = n_children_alive
328 n_sentinels_sent = 0
329 # Send the right number of sentinels, to make sure all children are
330 # properly terminated.
331 while n_sentinels_sent < n_children_to_stop and n_children_alive > 0:
332 for i in range(n_children_to_stop - n_sentinels_sent):
333 try:
334 call_queue.put_nowait(None)
335 n_sentinels_sent += 1
336 except Full:
337 break
338 n_children_alive = sum(p.is_alive() for p in processes.values())
339
Antoine Pitroudc19c242011-07-16 01:51:58 +0200340 # Release the queue's resources as soon as possible.
341 call_queue.close()
Antoine Pitroudd696492011-06-08 17:21:55 +0200342 # If .join() is not called on the created processes then
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200343 # some ctx.Queue methods may deadlock on Mac OS X.
Antoine Pitroudd696492011-06-08 17:21:55 +0200344 for p in processes.values():
345 p.join()
346
Thomas Moreau94459fd2018-01-05 11:15:54 +0100347 result_reader = result_queue._reader
348 wakeup_reader = thread_wakeup._reader
349 readers = [result_reader, wakeup_reader]
Antoine Pitroubdb1cf12012-03-05 19:28:37 +0100350
Brian Quinlan81c4d362010-09-18 22:35:02 +0000351 while True:
352 _add_call_item_to_queue(pending_work_items,
353 work_ids_queue,
354 call_queue)
355
Thomas Moreau94459fd2018-01-05 11:15:54 +0100356 # Wait for a result to be ready in the result_queue while checking
357 # that all worker processes are still running, or for a wake up
358 # signal send. The wake up signals come either from new tasks being
359 # submitted, from the executor being shutdown/gc-ed, or from the
360 # shutdown of the python interpreter.
361 worker_sentinels = [p.sentinel for p in processes.values()]
Brian Quinlanf7bda5c2019-05-07 13:31:11 -0400362 ready = mp.connection.wait(readers + worker_sentinels)
Thomas Moreau94459fd2018-01-05 11:15:54 +0100363
364 cause = None
365 is_broken = True
366 if result_reader in ready:
367 try:
368 result_item = result_reader.recv()
369 is_broken = False
370 except BaseException as e:
371 cause = traceback.format_exception(type(e), e, e.__traceback__)
372
373 elif wakeup_reader in ready:
374 is_broken = False
375 result_item = None
376 thread_wakeup.clear()
377 if is_broken:
Antoine Pitroudd696492011-06-08 17:21:55 +0200378 # Mark the process pool broken so that submits fail right now.
379 executor = executor_reference()
380 if executor is not None:
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100381 executor._broken = ('A child process terminated '
382 'abruptly, the process pool is not '
383 'usable anymore')
Antoine Pitroudd696492011-06-08 17:21:55 +0200384 executor._shutdown_thread = True
Antoine Pitrou020436b2011-07-02 21:20:25 +0200385 executor = None
Thomas Moreau94459fd2018-01-05 11:15:54 +0100386 bpe = BrokenProcessPool("A process in the process pool was "
387 "terminated abruptly while the future was "
388 "running or pending.")
389 if cause is not None:
390 bpe.__cause__ = _RemoteTraceback(
391 f"\n'''\n{''.join(cause)}'''")
Antoine Pitroudd696492011-06-08 17:21:55 +0200392 # All futures in flight must be marked failed
393 for work_id, work_item in pending_work_items.items():
Thomas Moreau94459fd2018-01-05 11:15:54 +0100394 work_item.future.set_exception(bpe)
Andrew Svetlov6b973742012-11-03 15:36:01 +0200395 # Delete references to object. See issue16284
396 del work_item
Antoine Pitroudd696492011-06-08 17:21:55 +0200397 pending_work_items.clear()
398 # Terminate remaining workers forcibly: the queues or their
399 # locks may be in a dirty state and block forever.
400 for p in processes.values():
401 p.terminate()
Antoine Pitroudc19c242011-07-16 01:51:58 +0200402 shutdown_worker()
Antoine Pitroudd696492011-06-08 17:21:55 +0200403 return
404 if isinstance(result_item, int):
405 # Clean shutdown of a worker using its PID
406 # (avoids marking the executor broken)
Antoine Pitrou020436b2011-07-02 21:20:25 +0200407 assert shutting_down()
Antoine Pitroud06a0652011-07-16 01:13:34 +0200408 p = processes.pop(result_item)
409 p.join()
Antoine Pitrou020436b2011-07-02 21:20:25 +0200410 if not processes:
411 shutdown_worker()
412 return
Antoine Pitroudd696492011-06-08 17:21:55 +0200413 elif result_item is not None:
414 work_item = pending_work_items.pop(result_item.work_id, None)
415 # work_item can be None if another process terminated (see above)
416 if work_item is not None:
417 if result_item.exception:
418 work_item.future.set_exception(result_item.exception)
419 else:
420 work_item.future.set_result(result_item.result)
Andrew Svetlov6b973742012-11-03 15:36:01 +0200421 # Delete references to object. See issue16284
422 del work_item
Thomas Moreau94459fd2018-01-05 11:15:54 +0100423 # Delete reference to result_item
424 del result_item
425
Antoine Pitroudd696492011-06-08 17:21:55 +0200426 # Check whether we should start shutting down.
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100427 executor = executor_reference()
428 # No more work items can be added if:
429 # - The interpreter is shutting down OR
430 # - The executor that owns this worker has been collected OR
431 # - The executor that owns this worker has been shutdown.
Antoine Pitrou020436b2011-07-02 21:20:25 +0200432 if shutting_down():
Antoine Pitrou020436b2011-07-02 21:20:25 +0200433 try:
Mark Nemecc4b695f2018-04-10 18:23:14 +0100434 # Flag the executor as shutting down as early as possible if it
435 # is not gc-ed yet.
436 if executor is not None:
437 executor._shutdown_thread = True
Antoine Pitrou1c405b32011-07-03 13:17:06 +0200438 # Since no new work items can be added, it is safe to shutdown
439 # this thread if there are no pending work items.
440 if not pending_work_items:
441 shutdown_worker()
442 return
Antoine Pitrou020436b2011-07-02 21:20:25 +0200443 except Full:
444 # This is not a problem: we will eventually be woken up (in
Antoine Pitrou1c405b32011-07-03 13:17:06 +0200445 # result_queue.get()) and be able to send a sentinel again.
Antoine Pitrou020436b2011-07-02 21:20:25 +0200446 pass
447 executor = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000448
Thomas Moreau94459fd2018-01-05 11:15:54 +0100449
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000450_system_limits_checked = False
451_system_limited = None
Thomas Moreau94459fd2018-01-05 11:15:54 +0100452
453
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000454def _check_system_limits():
455 global _system_limits_checked, _system_limited
456 if _system_limits_checked:
457 if _system_limited:
458 raise NotImplementedError(_system_limited)
459 _system_limits_checked = True
460 try:
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000461 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
462 except (AttributeError, ValueError):
463 # sysconf not available or setting not available
464 return
465 if nsems_max == -1:
Ezio Melottib5bc3532013-08-17 16:11:40 +0300466 # indetermined limit, assume that limit is determined
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000467 # by available memory only
468 return
469 if nsems_max >= 256:
470 # minimum number of semaphores available
471 # according to POSIX
472 return
Thomas Moreau94459fd2018-01-05 11:15:54 +0100473 _system_limited = ("system provides too few semaphores (%d"
474 " available, 256 necessary)" % nsems_max)
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000475 raise NotImplementedError(_system_limited)
476
Antoine Pitroudd696492011-06-08 17:21:55 +0200477
Grzegorz Grzywacz97e1b1c2017-09-01 18:54:00 +0200478def _chain_from_iterable_of_lists(iterable):
479 """
480 Specialized implementation of itertools.chain.from_iterable.
481 Each item in *iterable* should be a list. This function is
482 careful not to keep references to yielded objects.
483 """
484 for element in iterable:
485 element.reverse()
486 while element:
487 yield element.pop()
488
489
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100490class BrokenProcessPool(_base.BrokenExecutor):
Antoine Pitroudd696492011-06-08 17:21:55 +0200491 """
492 Raised when a process in a ProcessPoolExecutor terminated abruptly
493 while a future was in the running state.
494 """
495
496
Brian Quinlan81c4d362010-09-18 22:35:02 +0000497class ProcessPoolExecutor(_base.Executor):
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100498 def __init__(self, max_workers=None, mp_context=None,
499 initializer=None, initargs=()):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000500 """Initializes a new ProcessPoolExecutor instance.
501
502 Args:
503 max_workers: The maximum number of processes that can be used to
504 execute the given calls. If None or not given then as many
505 worker processes will be created as the machine has processors.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200506 mp_context: A multiprocessing context to launch the workers. This
507 object should provide SimpleQueue, Queue and Process.
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100508 initializer: An callable used to initialize worker processes.
509 initargs: A tuple of arguments to pass to the initializer.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000510 """
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000511 _check_system_limits()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000512
513 if max_workers is None:
Charles-François Natali37cfb0a2013-06-28 19:25:45 +0200514 self._max_workers = os.cpu_count() or 1
Brian Quinlan39889862019-05-08 14:04:53 -0400515 if sys.platform == 'win32':
516 self._max_workers = min(_MAX_WINDOWS_WORKERS,
517 self._max_workers)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000518 else:
Brian Quinlan20efceb2014-05-17 13:51:10 -0700519 if max_workers <= 0:
520 raise ValueError("max_workers must be greater than 0")
Brian Quinlan39889862019-05-08 14:04:53 -0400521 elif (sys.platform == 'win32' and
522 max_workers > _MAX_WINDOWS_WORKERS):
523 raise ValueError(
524 f"max_workers must be <= {_MAX_WINDOWS_WORKERS}")
Brian Quinlan20efceb2014-05-17 13:51:10 -0700525
Brian Quinlan81c4d362010-09-18 22:35:02 +0000526 self._max_workers = max_workers
Thomas Moreau94459fd2018-01-05 11:15:54 +0100527
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200528 if mp_context is None:
529 mp_context = mp.get_context()
530 self._mp_context = mp_context
Brian Quinlan81c4d362010-09-18 22:35:02 +0000531
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100532 if initializer is not None and not callable(initializer):
533 raise TypeError("initializer must be a callable")
534 self._initializer = initializer
535 self._initargs = initargs
536
Thomas Moreau94459fd2018-01-05 11:15:54 +0100537 # Management thread
Brian Quinlan81c4d362010-09-18 22:35:02 +0000538 self._queue_management_thread = None
Thomas Moreau94459fd2018-01-05 11:15:54 +0100539
Antoine Pitroudd696492011-06-08 17:21:55 +0200540 # Map of pids to processes
541 self._processes = {}
Brian Quinlan81c4d362010-09-18 22:35:02 +0000542
543 # Shutdown is a two-step process.
544 self._shutdown_thread = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000545 self._shutdown_lock = threading.Lock()
Antoine Pitroudd696492011-06-08 17:21:55 +0200546 self._broken = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000547 self._queue_count = 0
548 self._pending_work_items = {}
549
Thomas Moreau94459fd2018-01-05 11:15:54 +0100550 # Create communication channels for the executor
551 # Make the call queue slightly larger than the number of processes to
552 # prevent the worker processes from idling. But don't make it too big
553 # because futures in the call queue cannot be cancelled.
554 queue_size = self._max_workers + EXTRA_QUEUED_CALLS
555 self._call_queue = _SafeQueue(
556 max_size=queue_size, ctx=self._mp_context,
557 pending_work_items=self._pending_work_items)
558 # Killed worker processes can produce spurious "broken pipe"
559 # tracebacks in the queue's own worker thread. But we detect killed
560 # processes anyway, so silence the tracebacks.
561 self._call_queue._ignore_epipe = True
562 self._result_queue = mp_context.SimpleQueue()
563 self._work_ids = queue.Queue()
564
565 # _ThreadWakeup is a communication channel used to interrupt the wait
566 # of the main loop of queue_manager_thread from another thread (e.g.
567 # when calling executor.submit or executor.shutdown). We do not use the
568 # _result_queue to send the wakeup signal to the queue_manager_thread
569 # as it could result in a deadlock if a worker process dies with the
570 # _result_queue write lock still acquired.
571 self._queue_management_thread_wakeup = _ThreadWakeup()
572
Brian Quinlan81c4d362010-09-18 22:35:02 +0000573 def _start_queue_management_thread(self):
574 if self._queue_management_thread is None:
Thomas Moreau94459fd2018-01-05 11:15:54 +0100575 # When the executor gets garbarge collected, the weakref callback
576 # will wake up the queue management thread so that it can terminate
577 # if there is no pending work item.
578 def weakref_cb(_,
579 thread_wakeup=self._queue_management_thread_wakeup):
580 mp.util.debug('Executor collected: triggering callback for'
581 ' QueueManager wakeup')
582 thread_wakeup.wakeup()
Antoine Pitroudd696492011-06-08 17:21:55 +0200583 # Start the processes so that their sentinels are known.
584 self._adjust_process_count()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000585 self._queue_management_thread = threading.Thread(
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200586 target=_queue_management_worker,
587 args=(weakref.ref(self, weakref_cb),
588 self._processes,
589 self._pending_work_items,
590 self._work_ids,
591 self._call_queue,
Thomas Moreau94459fd2018-01-05 11:15:54 +0100592 self._result_queue,
593 self._queue_management_thread_wakeup),
594 name="QueueManagerThread")
Brian Quinlan81c4d362010-09-18 22:35:02 +0000595 self._queue_management_thread.daemon = True
596 self._queue_management_thread.start()
Thomas Moreau94459fd2018-01-05 11:15:54 +0100597 _threads_wakeups[self._queue_management_thread] = \
598 self._queue_management_thread_wakeup
Brian Quinlan81c4d362010-09-18 22:35:02 +0000599
600 def _adjust_process_count(self):
601 for _ in range(len(self._processes), self._max_workers):
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200602 p = self._mp_context.Process(
603 target=_process_worker,
604 args=(self._call_queue,
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100605 self._result_queue,
606 self._initializer,
607 self._initargs))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000608 p.start()
Antoine Pitroudd696492011-06-08 17:21:55 +0200609 self._processes[p.pid] = p
Brian Quinlan81c4d362010-09-18 22:35:02 +0000610
Serhiy Storchaka42a139e2019-04-01 09:16:35 +0300611 def submit(*args, **kwargs):
612 if len(args) >= 2:
613 self, fn, *args = args
614 elif not args:
615 raise TypeError("descriptor 'submit' of 'ProcessPoolExecutor' object "
616 "needs an argument")
617 elif 'fn' in kwargs:
618 fn = kwargs.pop('fn')
619 self, *args = args
620 import warnings
621 warnings.warn("Passing 'fn' as keyword argument is deprecated",
622 DeprecationWarning, stacklevel=2)
623 else:
624 raise TypeError('submit expected at least 1 positional argument, '
625 'got %d' % (len(args)-1))
626
Brian Quinlan81c4d362010-09-18 22:35:02 +0000627 with self._shutdown_lock:
Antoine Pitroudd696492011-06-08 17:21:55 +0200628 if self._broken:
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100629 raise BrokenProcessPool(self._broken)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000630 if self._shutdown_thread:
631 raise RuntimeError('cannot schedule new futures after shutdown')
Mark Nemecc4b695f2018-04-10 18:23:14 +0100632 if _global_shutdown:
633 raise RuntimeError('cannot schedule new futures after '
634 'interpreter shutdown')
Brian Quinlan81c4d362010-09-18 22:35:02 +0000635
636 f = _base.Future()
637 w = _WorkItem(f, fn, args, kwargs)
638
639 self._pending_work_items[self._queue_count] = w
640 self._work_ids.put(self._queue_count)
641 self._queue_count += 1
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100642 # Wake up queue management thread
Thomas Moreau94459fd2018-01-05 11:15:54 +0100643 self._queue_management_thread_wakeup.wakeup()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000644
645 self._start_queue_management_thread()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000646 return f
Serhiy Storchakad53cf992019-05-06 22:40:27 +0300647 submit.__text_signature__ = _base.Executor.submit.__text_signature__
Brian Quinlan81c4d362010-09-18 22:35:02 +0000648 submit.__doc__ = _base.Executor.submit.__doc__
649
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200650 def map(self, fn, *iterables, timeout=None, chunksize=1):
Martin Panterd2ad5712015-11-02 04:20:33 +0000651 """Returns an iterator equivalent to map(fn, iter).
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200652
653 Args:
654 fn: A callable that will take as many arguments as there are
655 passed iterables.
656 timeout: The maximum number of seconds to wait. If None, then there
657 is no limit on the wait time.
658 chunksize: If greater than one, the iterables will be chopped into
659 chunks of size chunksize and submitted to the process pool.
660 If set to one, the items in the list will be sent one at a time.
661
662 Returns:
663 An iterator equivalent to: map(func, *iterables) but the calls may
664 be evaluated out-of-order.
665
666 Raises:
667 TimeoutError: If the entire result iterator could not be generated
668 before the given timeout.
669 Exception: If fn(*args) raises for any values.
670 """
671 if chunksize < 1:
672 raise ValueError("chunksize must be >= 1.")
673
674 results = super().map(partial(_process_chunk, fn),
675 _get_chunks(*iterables, chunksize=chunksize),
676 timeout=timeout)
Grzegorz Grzywacz97e1b1c2017-09-01 18:54:00 +0200677 return _chain_from_iterable_of_lists(results)
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200678
Brian Quinlan81c4d362010-09-18 22:35:02 +0000679 def shutdown(self, wait=True):
680 with self._shutdown_lock:
681 self._shutdown_thread = True
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100682 if self._queue_management_thread:
683 # Wake up queue management thread
Thomas Moreau94459fd2018-01-05 11:15:54 +0100684 self._queue_management_thread_wakeup.wakeup()
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100685 if wait:
Brian Quinlan81c4d362010-09-18 22:35:02 +0000686 self._queue_management_thread.join()
Ezio Melottib5bc3532013-08-17 16:11:40 +0300687 # To reduce the risk of opening too many files, remove references to
Brian Quinlan81c4d362010-09-18 22:35:02 +0000688 # objects that use file descriptors.
689 self._queue_management_thread = None
Victor Stinnerb713adf2017-09-02 00:25:11 +0200690 if self._call_queue is not None:
691 self._call_queue.close()
692 if wait:
693 self._call_queue.join_thread()
694 self._call_queue = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000695 self._result_queue = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000696 self._processes = None
Thomas Moreau095ee412018-03-12 18:18:41 +0100697
698 if self._queue_management_thread_wakeup:
699 self._queue_management_thread_wakeup.close()
700 self._queue_management_thread_wakeup = None
701
Brian Quinlan81c4d362010-09-18 22:35:02 +0000702 shutdown.__doc__ = _base.Executor.shutdown.__doc__
703
704atexit.register(_python_exit)