blob: aaa5151e017c0f7421b6ec06dbfc7cac0895d793 [file] [log] [blame]
Brian Quinlan81c4d362010-09-18 22:35:02 +00001# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4"""Implements ProcessPoolExecutor.
5
6The follow diagram and text describe the data-flow through the system:
7
8|======================= In-process =====================|== Out-of-process ==|
9
10+----------+ +----------+ +--------+ +-----------+ +---------+
Thomas Moreau94459fd2018-01-05 11:15:54 +010011| | => | Work Ids | | | | Call Q | | Process |
12| | +----------+ | | +-----------+ | Pool |
13| | | ... | | | | ... | +---------+
14| | | 6 | => | | => | 5, call() | => | |
Brian Quinlan81c4d362010-09-18 22:35:02 +000015| | | 7 | | | | ... | | |
16| Process | | ... | | Local | +-----------+ | Process |
17| Pool | +----------+ | Worker | | #1..n |
18| Executor | | Thread | | |
19| | +----------- + | | +-----------+ | |
20| | <=> | Work Items | <=> | | <= | Result Q | <= | |
21| | +------------+ | | +-----------+ | |
22| | | 6: call() | | | | ... | | |
23| | | future | | | | 4, result | | |
24| | | ... | | | | 3, except | | |
25+----------+ +------------+ +--------+ +-----------+ +---------+
26
27Executor.submit() called:
28- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29- adds the id of the _WorkItem to the "Work Ids" queue
30
31Local worker thread:
32- reads work ids from the "Work Ids" queue and looks up the corresponding
33 WorkItem from the "Work Items" dict: if the work item has been cancelled then
34 it is simply removed from the dict, otherwise it is repackaged as a
35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38- reads _ResultItems from "Result Q", updates the future stored in the
39 "Work Items" dict and deletes the dict entry
40
41Process #1..n:
42- reads _CallItems from "Call Q", executes the calls, and puts the resulting
Mark Dickinson5ee24042012-10-20 13:16:49 +010043 _ResultItems in "Result Q"
Brian Quinlan81c4d362010-09-18 22:35:02 +000044"""
45
46__author__ = 'Brian Quinlan (brian@sweetapp.com)'
47
48import atexit
Antoine Pitroudd696492011-06-08 17:21:55 +020049import os
Brian Quinlan81c4d362010-09-18 22:35:02 +000050from concurrent.futures import _base
51import queue
Richard Oudkerk1f2eaa92013-10-16 17:06:22 +010052from queue import Full
Thomas Moreaue8c368d2017-10-03 11:53:17 +020053import multiprocessing as mp
Antoine Pitroubdb1cf12012-03-05 19:28:37 +010054from multiprocessing.connection import wait
Thomas Moreau94459fd2018-01-05 11:15:54 +010055from multiprocessing.queues import Queue
Brian Quinlan81c4d362010-09-18 22:35:02 +000056import threading
57import weakref
Antoine Pitrou4aae2762014-10-04 20:20:10 +020058from functools import partial
59import itertools
Antoine Pitrou1285c9b2015-01-17 20:02:14 +010060import traceback
Brian Quinlan81c4d362010-09-18 22:35:02 +000061
62# Workers are created as daemon threads and processes. This is done to allow the
63# interpreter to exit when there are still idle processes in a
64# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
65# allowing workers to die with the interpreter has two undesirable properties:
Raymond Hettinger15f44ab2016-08-30 10:47:49 -070066# - The workers would still be running during interpreter shutdown,
Brian Quinlan81c4d362010-09-18 22:35:02 +000067# meaning that they would fail in unpredictable ways.
68# - The workers could be killed while evaluating a work item, which could
69# be bad if the callable being evaluated has external side-effects e.g.
70# writing to a file.
71#
72# To work around this problem, an exit handler is installed which tells the
73# workers to exit when their work queues are empty and then waits until the
74# threads/processes finish.
75
Thomas Moreau94459fd2018-01-05 11:15:54 +010076_threads_wakeups = weakref.WeakKeyDictionary()
Thomas Moreaue8c368d2017-10-03 11:53:17 +020077_global_shutdown = False
Brian Quinlan81c4d362010-09-18 22:35:02 +000078
Thomas Moreau94459fd2018-01-05 11:15:54 +010079
80class _ThreadWakeup:
81 __slot__ = ["_state"]
82
83 def __init__(self):
84 self._reader, self._writer = mp.Pipe(duplex=False)
85
86 def wakeup(self):
87 self._writer.send_bytes(b"")
88
89 def clear(self):
90 while self._reader.poll():
91 self._reader.recv_bytes()
92
93
Brian Quinlan81c4d362010-09-18 22:35:02 +000094def _python_exit():
Thomas Moreaue8c368d2017-10-03 11:53:17 +020095 global _global_shutdown
96 _global_shutdown = True
Thomas Moreau94459fd2018-01-05 11:15:54 +010097 items = list(_threads_wakeups.items())
98 for _, thread_wakeup in items:
99 thread_wakeup.wakeup()
100 for t, _ in items:
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100101 t.join()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000102
103# Controls how many more calls than processes will be queued in the call queue.
104# A smaller number will mean that processes spend more time idle waiting for
105# work while a larger number will make Future.cancel() succeed less frequently
106# (Futures in the call queue cannot be cancelled).
107EXTRA_QUEUED_CALLS = 1
108
Thomas Moreau94459fd2018-01-05 11:15:54 +0100109
Antoine Pitrou1285c9b2015-01-17 20:02:14 +0100110# Hack to embed stringification of remote traceback in local traceback
111
112class _RemoteTraceback(Exception):
113 def __init__(self, tb):
114 self.tb = tb
115 def __str__(self):
116 return self.tb
117
118class _ExceptionWithTraceback:
119 def __init__(self, exc, tb):
120 tb = traceback.format_exception(type(exc), exc, tb)
121 tb = ''.join(tb)
122 self.exc = exc
123 self.tb = '\n"""\n%s"""' % tb
124 def __reduce__(self):
125 return _rebuild_exc, (self.exc, self.tb)
126
127def _rebuild_exc(exc, tb):
128 exc.__cause__ = _RemoteTraceback(tb)
129 return exc
130
Brian Quinlan81c4d362010-09-18 22:35:02 +0000131class _WorkItem(object):
132 def __init__(self, future, fn, args, kwargs):
133 self.future = future
134 self.fn = fn
135 self.args = args
136 self.kwargs = kwargs
137
138class _ResultItem(object):
139 def __init__(self, work_id, exception=None, result=None):
140 self.work_id = work_id
141 self.exception = exception
142 self.result = result
143
144class _CallItem(object):
145 def __init__(self, work_id, fn, args, kwargs):
146 self.work_id = work_id
147 self.fn = fn
148 self.args = args
149 self.kwargs = kwargs
150
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100151
Thomas Moreau94459fd2018-01-05 11:15:54 +0100152class _SafeQueue(Queue):
153 """Safe Queue set exception to the future object linked to a job"""
154 def __init__(self, max_size=0, *, ctx, pending_work_items):
155 self.pending_work_items = pending_work_items
156 super().__init__(max_size, ctx=ctx)
157
158 def _on_queue_feeder_error(self, e, obj):
159 if isinstance(obj, _CallItem):
160 tb = traceback.format_exception(type(e), e, e.__traceback__)
161 e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
162 work_item = self.pending_work_items.pop(obj.work_id, None)
163 # work_item can be None if another process terminated. In this case,
164 # the queue_manager_thread fails all work_items with BrokenProcessPool
165 if work_item is not None:
166 work_item.future.set_exception(e)
167 else:
168 super()._on_queue_feeder_error(e, obj)
169
170
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200171def _get_chunks(*iterables, chunksize):
172 """ Iterates over zip()ed iterables in chunks. """
173 it = zip(*iterables)
174 while True:
175 chunk = tuple(itertools.islice(it, chunksize))
176 if not chunk:
177 return
178 yield chunk
179
180def _process_chunk(fn, chunk):
181 """ Processes a chunk of an iterable passed to map.
182
183 Runs the function passed to map() on a chunk of the
184 iterable passed to map.
185
186 This function is run in a separate process.
187
188 """
189 return [fn(*args) for args in chunk]
190
Thomas Moreau94459fd2018-01-05 11:15:54 +0100191
192def _sendback_result(result_queue, work_id, result=None, exception=None):
193 """Safely send back the given result or exception"""
194 try:
195 result_queue.put(_ResultItem(work_id, result=result,
196 exception=exception))
197 except BaseException as e:
198 exc = _ExceptionWithTraceback(e, e.__traceback__)
199 result_queue.put(_ResultItem(work_id, exception=exc))
200
201
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100202def _process_worker(call_queue, result_queue, initializer, initargs):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000203 """Evaluates calls from call_queue and places the results in result_queue.
204
Georg Brandlfb1720b2010-12-09 18:08:43 +0000205 This worker is run in a separate process.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000206
207 Args:
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200208 call_queue: A ctx.Queue of _CallItems that will be read and
Brian Quinlan81c4d362010-09-18 22:35:02 +0000209 evaluated by the worker.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200210 result_queue: A ctx.Queue of _ResultItems that will written
Brian Quinlan81c4d362010-09-18 22:35:02 +0000211 to by the worker.
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100212 initializer: A callable initializer, or None
213 initargs: A tuple of args for the initializer
Brian Quinlan81c4d362010-09-18 22:35:02 +0000214 """
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100215 if initializer is not None:
216 try:
217 initializer(*initargs)
218 except BaseException:
219 _base.LOGGER.critical('Exception in initializer:', exc_info=True)
220 # The parent will notice that the process stopped and
221 # mark the pool broken
222 return
Brian Quinlan81c4d362010-09-18 22:35:02 +0000223 while True:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200224 call_item = call_queue.get(block=True)
225 if call_item is None:
226 # Wake up queue management thread
Antoine Pitroudd696492011-06-08 17:21:55 +0200227 result_queue.put(os.getpid())
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200228 return
Brian Quinlan81c4d362010-09-18 22:35:02 +0000229 try:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200230 r = call_item.fn(*call_item.args, **call_item.kwargs)
231 except BaseException as e:
Antoine Pitrou1285c9b2015-01-17 20:02:14 +0100232 exc = _ExceptionWithTraceback(e, e.__traceback__)
Thomas Moreau94459fd2018-01-05 11:15:54 +0100233 _sendback_result(result_queue, call_item.work_id, exception=exc)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000234 else:
Thomas Moreau94459fd2018-01-05 11:15:54 +0100235 _sendback_result(result_queue, call_item.work_id, result=r)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000236
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200237 # Liberate the resource as soon as possible, to avoid holding onto
238 # open files or shared memory that is not needed anymore
239 del call_item
240
241
Brian Quinlan81c4d362010-09-18 22:35:02 +0000242def _add_call_item_to_queue(pending_work_items,
243 work_ids,
244 call_queue):
245 """Fills call_queue with _WorkItems from pending_work_items.
246
247 This function never blocks.
248
249 Args:
250 pending_work_items: A dict mapping work ids to _WorkItems e.g.
251 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
252 work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
253 are consumed and the corresponding _WorkItems from
254 pending_work_items are transformed into _CallItems and put in
255 call_queue.
256 call_queue: A multiprocessing.Queue that will be filled with _CallItems
257 derived from _WorkItems.
258 """
259 while True:
260 if call_queue.full():
261 return
262 try:
263 work_id = work_ids.get(block=False)
264 except queue.Empty:
265 return
266 else:
267 work_item = pending_work_items[work_id]
268
269 if work_item.future.set_running_or_notify_cancel():
270 call_queue.put(_CallItem(work_id,
271 work_item.fn,
272 work_item.args,
273 work_item.kwargs),
274 block=True)
275 else:
276 del pending_work_items[work_id]
277 continue
278
Thomas Moreau94459fd2018-01-05 11:15:54 +0100279
Antoine Pitroub87a56a2011-05-03 16:34:42 +0200280def _queue_management_worker(executor_reference,
281 processes,
282 pending_work_items,
283 work_ids_queue,
284 call_queue,
Thomas Moreau94459fd2018-01-05 11:15:54 +0100285 result_queue,
286 thread_wakeup):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000287 """Manages the communication between this process and the worker processes.
288
289 This function is run in a local thread.
290
291 Args:
292 executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
293 this thread. Used to determine if the ProcessPoolExecutor has been
294 garbage collected and that this function can exit.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200295 process: A list of the ctx.Process instances used as
Brian Quinlan81c4d362010-09-18 22:35:02 +0000296 workers.
297 pending_work_items: A dict mapping work ids to _WorkItems e.g.
298 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
299 work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200300 call_queue: A ctx.Queue that will be filled with _CallItems
Brian Quinlan81c4d362010-09-18 22:35:02 +0000301 derived from _WorkItems for processing by the process workers.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200302 result_queue: A ctx.SimpleQueue of _ResultItems generated by the
Brian Quinlan81c4d362010-09-18 22:35:02 +0000303 process workers.
Thomas Moreau94459fd2018-01-05 11:15:54 +0100304 thread_wakeup: A _ThreadWakeup to allow waking up the
305 queue_manager_thread from the main Thread and avoid deadlocks
306 caused by permanently locked queues.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000307 """
Antoine Pitrou020436b2011-07-02 21:20:25 +0200308 executor = None
309
310 def shutting_down():
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200311 return (_global_shutdown or executor is None
312 or executor._shutdown_thread)
Antoine Pitroudd696492011-06-08 17:21:55 +0200313
314 def shutdown_worker():
Thomas Moreau94459fd2018-01-05 11:15:54 +0100315 # This is an upper bound on the number of children alive.
316 n_children_alive = sum(p.is_alive() for p in processes.values())
317 n_children_to_stop = n_children_alive
318 n_sentinels_sent = 0
319 # Send the right number of sentinels, to make sure all children are
320 # properly terminated.
321 while n_sentinels_sent < n_children_to_stop and n_children_alive > 0:
322 for i in range(n_children_to_stop - n_sentinels_sent):
323 try:
324 call_queue.put_nowait(None)
325 n_sentinels_sent += 1
326 except Full:
327 break
328 n_children_alive = sum(p.is_alive() for p in processes.values())
329
Antoine Pitroudc19c242011-07-16 01:51:58 +0200330 # Release the queue's resources as soon as possible.
331 call_queue.close()
Antoine Pitroudd696492011-06-08 17:21:55 +0200332 # If .join() is not called on the created processes then
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200333 # some ctx.Queue methods may deadlock on Mac OS X.
Antoine Pitroudd696492011-06-08 17:21:55 +0200334 for p in processes.values():
335 p.join()
336
Thomas Moreau94459fd2018-01-05 11:15:54 +0100337 result_reader = result_queue._reader
338 wakeup_reader = thread_wakeup._reader
339 readers = [result_reader, wakeup_reader]
Antoine Pitroubdb1cf12012-03-05 19:28:37 +0100340
Brian Quinlan81c4d362010-09-18 22:35:02 +0000341 while True:
342 _add_call_item_to_queue(pending_work_items,
343 work_ids_queue,
344 call_queue)
345
Thomas Moreau94459fd2018-01-05 11:15:54 +0100346 # Wait for a result to be ready in the result_queue while checking
347 # that all worker processes are still running, or for a wake up
348 # signal send. The wake up signals come either from new tasks being
349 # submitted, from the executor being shutdown/gc-ed, or from the
350 # shutdown of the python interpreter.
351 worker_sentinels = [p.sentinel for p in processes.values()]
352 ready = wait(readers + worker_sentinels)
353
354 cause = None
355 is_broken = True
356 if result_reader in ready:
357 try:
358 result_item = result_reader.recv()
359 is_broken = False
360 except BaseException as e:
361 cause = traceback.format_exception(type(e), e, e.__traceback__)
362
363 elif wakeup_reader in ready:
364 is_broken = False
365 result_item = None
366 thread_wakeup.clear()
367 if is_broken:
Antoine Pitroudd696492011-06-08 17:21:55 +0200368 # Mark the process pool broken so that submits fail right now.
369 executor = executor_reference()
370 if executor is not None:
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100371 executor._broken = ('A child process terminated '
372 'abruptly, the process pool is not '
373 'usable anymore')
Antoine Pitroudd696492011-06-08 17:21:55 +0200374 executor._shutdown_thread = True
Antoine Pitrou020436b2011-07-02 21:20:25 +0200375 executor = None
Thomas Moreau94459fd2018-01-05 11:15:54 +0100376 bpe = BrokenProcessPool("A process in the process pool was "
377 "terminated abruptly while the future was "
378 "running or pending.")
379 if cause is not None:
380 bpe.__cause__ = _RemoteTraceback(
381 f"\n'''\n{''.join(cause)}'''")
Antoine Pitroudd696492011-06-08 17:21:55 +0200382 # All futures in flight must be marked failed
383 for work_id, work_item in pending_work_items.items():
Thomas Moreau94459fd2018-01-05 11:15:54 +0100384 work_item.future.set_exception(bpe)
Andrew Svetlov6b973742012-11-03 15:36:01 +0200385 # Delete references to object. See issue16284
386 del work_item
Antoine Pitroudd696492011-06-08 17:21:55 +0200387 pending_work_items.clear()
388 # Terminate remaining workers forcibly: the queues or their
389 # locks may be in a dirty state and block forever.
390 for p in processes.values():
391 p.terminate()
Antoine Pitroudc19c242011-07-16 01:51:58 +0200392 shutdown_worker()
Antoine Pitroudd696492011-06-08 17:21:55 +0200393 return
394 if isinstance(result_item, int):
395 # Clean shutdown of a worker using its PID
396 # (avoids marking the executor broken)
Antoine Pitrou020436b2011-07-02 21:20:25 +0200397 assert shutting_down()
Antoine Pitroud06a0652011-07-16 01:13:34 +0200398 p = processes.pop(result_item)
399 p.join()
Antoine Pitrou020436b2011-07-02 21:20:25 +0200400 if not processes:
401 shutdown_worker()
402 return
Antoine Pitroudd696492011-06-08 17:21:55 +0200403 elif result_item is not None:
404 work_item = pending_work_items.pop(result_item.work_id, None)
405 # work_item can be None if another process terminated (see above)
406 if work_item is not None:
407 if result_item.exception:
408 work_item.future.set_exception(result_item.exception)
409 else:
410 work_item.future.set_result(result_item.result)
Andrew Svetlov6b973742012-11-03 15:36:01 +0200411 # Delete references to object. See issue16284
412 del work_item
Thomas Moreau94459fd2018-01-05 11:15:54 +0100413 # Delete reference to result_item
414 del result_item
415
Antoine Pitroudd696492011-06-08 17:21:55 +0200416 # Check whether we should start shutting down.
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100417 executor = executor_reference()
418 # No more work items can be added if:
419 # - The interpreter is shutting down OR
420 # - The executor that owns this worker has been collected OR
421 # - The executor that owns this worker has been shutdown.
Antoine Pitrou020436b2011-07-02 21:20:25 +0200422 if shutting_down():
Antoine Pitrou020436b2011-07-02 21:20:25 +0200423 try:
Antoine Pitrou1c405b32011-07-03 13:17:06 +0200424 # Since no new work items can be added, it is safe to shutdown
425 # this thread if there are no pending work items.
426 if not pending_work_items:
427 shutdown_worker()
428 return
Antoine Pitrou020436b2011-07-02 21:20:25 +0200429 except Full:
430 # This is not a problem: we will eventually be woken up (in
Antoine Pitrou1c405b32011-07-03 13:17:06 +0200431 # result_queue.get()) and be able to send a sentinel again.
Antoine Pitrou020436b2011-07-02 21:20:25 +0200432 pass
433 executor = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000434
Thomas Moreau94459fd2018-01-05 11:15:54 +0100435
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000436_system_limits_checked = False
437_system_limited = None
Thomas Moreau94459fd2018-01-05 11:15:54 +0100438
439
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000440def _check_system_limits():
441 global _system_limits_checked, _system_limited
442 if _system_limits_checked:
443 if _system_limited:
444 raise NotImplementedError(_system_limited)
445 _system_limits_checked = True
446 try:
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000447 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
448 except (AttributeError, ValueError):
449 # sysconf not available or setting not available
450 return
451 if nsems_max == -1:
Ezio Melottib5bc3532013-08-17 16:11:40 +0300452 # indetermined limit, assume that limit is determined
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000453 # by available memory only
454 return
455 if nsems_max >= 256:
456 # minimum number of semaphores available
457 # according to POSIX
458 return
Thomas Moreau94459fd2018-01-05 11:15:54 +0100459 _system_limited = ("system provides too few semaphores (%d"
460 " available, 256 necessary)" % nsems_max)
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000461 raise NotImplementedError(_system_limited)
462
Antoine Pitroudd696492011-06-08 17:21:55 +0200463
Grzegorz Grzywacz97e1b1c2017-09-01 18:54:00 +0200464def _chain_from_iterable_of_lists(iterable):
465 """
466 Specialized implementation of itertools.chain.from_iterable.
467 Each item in *iterable* should be a list. This function is
468 careful not to keep references to yielded objects.
469 """
470 for element in iterable:
471 element.reverse()
472 while element:
473 yield element.pop()
474
475
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100476class BrokenProcessPool(_base.BrokenExecutor):
Antoine Pitroudd696492011-06-08 17:21:55 +0200477 """
478 Raised when a process in a ProcessPoolExecutor terminated abruptly
479 while a future was in the running state.
480 """
481
482
Brian Quinlan81c4d362010-09-18 22:35:02 +0000483class ProcessPoolExecutor(_base.Executor):
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100484 def __init__(self, max_workers=None, mp_context=None,
485 initializer=None, initargs=()):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000486 """Initializes a new ProcessPoolExecutor instance.
487
488 Args:
489 max_workers: The maximum number of processes that can be used to
490 execute the given calls. If None or not given then as many
491 worker processes will be created as the machine has processors.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200492 mp_context: A multiprocessing context to launch the workers. This
493 object should provide SimpleQueue, Queue and Process.
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100494 initializer: An callable used to initialize worker processes.
495 initargs: A tuple of arguments to pass to the initializer.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000496 """
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000497 _check_system_limits()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000498
499 if max_workers is None:
Charles-François Natali37cfb0a2013-06-28 19:25:45 +0200500 self._max_workers = os.cpu_count() or 1
Brian Quinlan81c4d362010-09-18 22:35:02 +0000501 else:
Brian Quinlan20efceb2014-05-17 13:51:10 -0700502 if max_workers <= 0:
503 raise ValueError("max_workers must be greater than 0")
504
Brian Quinlan81c4d362010-09-18 22:35:02 +0000505 self._max_workers = max_workers
Thomas Moreau94459fd2018-01-05 11:15:54 +0100506
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200507 if mp_context is None:
508 mp_context = mp.get_context()
509 self._mp_context = mp_context
Brian Quinlan81c4d362010-09-18 22:35:02 +0000510
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100511 if initializer is not None and not callable(initializer):
512 raise TypeError("initializer must be a callable")
513 self._initializer = initializer
514 self._initargs = initargs
515
Thomas Moreau94459fd2018-01-05 11:15:54 +0100516 # Management thread
Brian Quinlan81c4d362010-09-18 22:35:02 +0000517 self._queue_management_thread = None
Thomas Moreau94459fd2018-01-05 11:15:54 +0100518
Antoine Pitroudd696492011-06-08 17:21:55 +0200519 # Map of pids to processes
520 self._processes = {}
Brian Quinlan81c4d362010-09-18 22:35:02 +0000521
522 # Shutdown is a two-step process.
523 self._shutdown_thread = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000524 self._shutdown_lock = threading.Lock()
Antoine Pitroudd696492011-06-08 17:21:55 +0200525 self._broken = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000526 self._queue_count = 0
527 self._pending_work_items = {}
528
Thomas Moreau94459fd2018-01-05 11:15:54 +0100529 # Create communication channels for the executor
530 # Make the call queue slightly larger than the number of processes to
531 # prevent the worker processes from idling. But don't make it too big
532 # because futures in the call queue cannot be cancelled.
533 queue_size = self._max_workers + EXTRA_QUEUED_CALLS
534 self._call_queue = _SafeQueue(
535 max_size=queue_size, ctx=self._mp_context,
536 pending_work_items=self._pending_work_items)
537 # Killed worker processes can produce spurious "broken pipe"
538 # tracebacks in the queue's own worker thread. But we detect killed
539 # processes anyway, so silence the tracebacks.
540 self._call_queue._ignore_epipe = True
541 self._result_queue = mp_context.SimpleQueue()
542 self._work_ids = queue.Queue()
543
544 # _ThreadWakeup is a communication channel used to interrupt the wait
545 # of the main loop of queue_manager_thread from another thread (e.g.
546 # when calling executor.submit or executor.shutdown). We do not use the
547 # _result_queue to send the wakeup signal to the queue_manager_thread
548 # as it could result in a deadlock if a worker process dies with the
549 # _result_queue write lock still acquired.
550 self._queue_management_thread_wakeup = _ThreadWakeup()
551
Brian Quinlan81c4d362010-09-18 22:35:02 +0000552 def _start_queue_management_thread(self):
553 if self._queue_management_thread is None:
Thomas Moreau94459fd2018-01-05 11:15:54 +0100554 # When the executor gets garbarge collected, the weakref callback
555 # will wake up the queue management thread so that it can terminate
556 # if there is no pending work item.
557 def weakref_cb(_,
558 thread_wakeup=self._queue_management_thread_wakeup):
559 mp.util.debug('Executor collected: triggering callback for'
560 ' QueueManager wakeup')
561 thread_wakeup.wakeup()
Antoine Pitroudd696492011-06-08 17:21:55 +0200562 # Start the processes so that their sentinels are known.
563 self._adjust_process_count()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000564 self._queue_management_thread = threading.Thread(
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200565 target=_queue_management_worker,
566 args=(weakref.ref(self, weakref_cb),
567 self._processes,
568 self._pending_work_items,
569 self._work_ids,
570 self._call_queue,
Thomas Moreau94459fd2018-01-05 11:15:54 +0100571 self._result_queue,
572 self._queue_management_thread_wakeup),
573 name="QueueManagerThread")
Brian Quinlan81c4d362010-09-18 22:35:02 +0000574 self._queue_management_thread.daemon = True
575 self._queue_management_thread.start()
Thomas Moreau94459fd2018-01-05 11:15:54 +0100576 _threads_wakeups[self._queue_management_thread] = \
577 self._queue_management_thread_wakeup
Brian Quinlan81c4d362010-09-18 22:35:02 +0000578
579 def _adjust_process_count(self):
580 for _ in range(len(self._processes), self._max_workers):
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200581 p = self._mp_context.Process(
582 target=_process_worker,
583 args=(self._call_queue,
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100584 self._result_queue,
585 self._initializer,
586 self._initargs))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000587 p.start()
Antoine Pitroudd696492011-06-08 17:21:55 +0200588 self._processes[p.pid] = p
Brian Quinlan81c4d362010-09-18 22:35:02 +0000589
590 def submit(self, fn, *args, **kwargs):
591 with self._shutdown_lock:
Antoine Pitroudd696492011-06-08 17:21:55 +0200592 if self._broken:
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100593 raise BrokenProcessPool(self._broken)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000594 if self._shutdown_thread:
595 raise RuntimeError('cannot schedule new futures after shutdown')
596
597 f = _base.Future()
598 w = _WorkItem(f, fn, args, kwargs)
599
600 self._pending_work_items[self._queue_count] = w
601 self._work_ids.put(self._queue_count)
602 self._queue_count += 1
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100603 # Wake up queue management thread
Thomas Moreau94459fd2018-01-05 11:15:54 +0100604 self._queue_management_thread_wakeup.wakeup()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000605
606 self._start_queue_management_thread()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000607 return f
608 submit.__doc__ = _base.Executor.submit.__doc__
609
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200610 def map(self, fn, *iterables, timeout=None, chunksize=1):
Martin Panterd2ad5712015-11-02 04:20:33 +0000611 """Returns an iterator equivalent to map(fn, iter).
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200612
613 Args:
614 fn: A callable that will take as many arguments as there are
615 passed iterables.
616 timeout: The maximum number of seconds to wait. If None, then there
617 is no limit on the wait time.
618 chunksize: If greater than one, the iterables will be chopped into
619 chunks of size chunksize and submitted to the process pool.
620 If set to one, the items in the list will be sent one at a time.
621
622 Returns:
623 An iterator equivalent to: map(func, *iterables) but the calls may
624 be evaluated out-of-order.
625
626 Raises:
627 TimeoutError: If the entire result iterator could not be generated
628 before the given timeout.
629 Exception: If fn(*args) raises for any values.
630 """
631 if chunksize < 1:
632 raise ValueError("chunksize must be >= 1.")
633
634 results = super().map(partial(_process_chunk, fn),
635 _get_chunks(*iterables, chunksize=chunksize),
636 timeout=timeout)
Grzegorz Grzywacz97e1b1c2017-09-01 18:54:00 +0200637 return _chain_from_iterable_of_lists(results)
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200638
Brian Quinlan81c4d362010-09-18 22:35:02 +0000639 def shutdown(self, wait=True):
640 with self._shutdown_lock:
641 self._shutdown_thread = True
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100642 if self._queue_management_thread:
643 # Wake up queue management thread
Thomas Moreau94459fd2018-01-05 11:15:54 +0100644 self._queue_management_thread_wakeup.wakeup()
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100645 if wait:
Brian Quinlan81c4d362010-09-18 22:35:02 +0000646 self._queue_management_thread.join()
Ezio Melottib5bc3532013-08-17 16:11:40 +0300647 # To reduce the risk of opening too many files, remove references to
Brian Quinlan81c4d362010-09-18 22:35:02 +0000648 # objects that use file descriptors.
649 self._queue_management_thread = None
Victor Stinnerb713adf2017-09-02 00:25:11 +0200650 if self._call_queue is not None:
651 self._call_queue.close()
652 if wait:
653 self._call_queue.join_thread()
654 self._call_queue = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000655 self._result_queue = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000656 self._processes = None
657 shutdown.__doc__ = _base.Executor.shutdown.__doc__
658
659atexit.register(_python_exit)