blob: 63f22cfca3252c59cec7b9abf3ba84ecb47b4486 [file] [log] [blame]
Brian Quinlan81c4d362010-09-18 22:35:02 +00001# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4"""Implements ProcessPoolExecutor.
5
6The follow diagram and text describe the data-flow through the system:
7
8|======================= In-process =====================|== Out-of-process ==|
9
10+----------+ +----------+ +--------+ +-----------+ +---------+
Thomas Moreau94459fd2018-01-05 11:15:54 +010011| | => | Work Ids | | | | Call Q | | Process |
12| | +----------+ | | +-----------+ | Pool |
13| | | ... | | | | ... | +---------+
14| | | 6 | => | | => | 5, call() | => | |
Brian Quinlan81c4d362010-09-18 22:35:02 +000015| | | 7 | | | | ... | | |
16| Process | | ... | | Local | +-----------+ | Process |
17| Pool | +----------+ | Worker | | #1..n |
18| Executor | | Thread | | |
19| | +----------- + | | +-----------+ | |
20| | <=> | Work Items | <=> | | <= | Result Q | <= | |
21| | +------------+ | | +-----------+ | |
22| | | 6: call() | | | | ... | | |
23| | | future | | | | 4, result | | |
24| | | ... | | | | 3, except | | |
25+----------+ +------------+ +--------+ +-----------+ +---------+
26
27Executor.submit() called:
28- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29- adds the id of the _WorkItem to the "Work Ids" queue
30
31Local worker thread:
32- reads work ids from the "Work Ids" queue and looks up the corresponding
33 WorkItem from the "Work Items" dict: if the work item has been cancelled then
34 it is simply removed from the dict, otherwise it is repackaged as a
35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38- reads _ResultItems from "Result Q", updates the future stored in the
39 "Work Items" dict and deletes the dict entry
40
41Process #1..n:
42- reads _CallItems from "Call Q", executes the calls, and puts the resulting
Mark Dickinson5ee24042012-10-20 13:16:49 +010043 _ResultItems in "Result Q"
Brian Quinlan81c4d362010-09-18 22:35:02 +000044"""
45
46__author__ = 'Brian Quinlan (brian@sweetapp.com)'
47
48import atexit
Antoine Pitroudd696492011-06-08 17:21:55 +020049import os
Brian Quinlan81c4d362010-09-18 22:35:02 +000050from concurrent.futures import _base
51import queue
Richard Oudkerk1f2eaa92013-10-16 17:06:22 +010052from queue import Full
Thomas Moreaue8c368d2017-10-03 11:53:17 +020053import multiprocessing as mp
Antoine Pitroubdb1cf12012-03-05 19:28:37 +010054from multiprocessing.connection import wait
Thomas Moreau94459fd2018-01-05 11:15:54 +010055from multiprocessing.queues import Queue
Brian Quinlan81c4d362010-09-18 22:35:02 +000056import threading
57import weakref
Antoine Pitrou4aae2762014-10-04 20:20:10 +020058from functools import partial
59import itertools
Antoine Pitrou1285c9b2015-01-17 20:02:14 +010060import traceback
Brian Quinlan81c4d362010-09-18 22:35:02 +000061
62# Workers are created as daemon threads and processes. This is done to allow the
63# interpreter to exit when there are still idle processes in a
64# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
65# allowing workers to die with the interpreter has two undesirable properties:
Raymond Hettinger15f44ab2016-08-30 10:47:49 -070066# - The workers would still be running during interpreter shutdown,
Brian Quinlan81c4d362010-09-18 22:35:02 +000067# meaning that they would fail in unpredictable ways.
68# - The workers could be killed while evaluating a work item, which could
69# be bad if the callable being evaluated has external side-effects e.g.
70# writing to a file.
71#
72# To work around this problem, an exit handler is installed which tells the
73# workers to exit when their work queues are empty and then waits until the
74# threads/processes finish.
75
Thomas Moreau94459fd2018-01-05 11:15:54 +010076_threads_wakeups = weakref.WeakKeyDictionary()
Thomas Moreaue8c368d2017-10-03 11:53:17 +020077_global_shutdown = False
Brian Quinlan81c4d362010-09-18 22:35:02 +000078
Thomas Moreau94459fd2018-01-05 11:15:54 +010079
80class _ThreadWakeup:
Thomas Moreau94459fd2018-01-05 11:15:54 +010081 def __init__(self):
82 self._reader, self._writer = mp.Pipe(duplex=False)
83
Thomas Moreau095ee412018-03-12 18:18:41 +010084 def close(self):
85 self._writer.close()
86 self._reader.close()
87
Thomas Moreau94459fd2018-01-05 11:15:54 +010088 def wakeup(self):
89 self._writer.send_bytes(b"")
90
91 def clear(self):
92 while self._reader.poll():
93 self._reader.recv_bytes()
94
95
Brian Quinlan81c4d362010-09-18 22:35:02 +000096def _python_exit():
Thomas Moreaue8c368d2017-10-03 11:53:17 +020097 global _global_shutdown
98 _global_shutdown = True
Thomas Moreau94459fd2018-01-05 11:15:54 +010099 items = list(_threads_wakeups.items())
100 for _, thread_wakeup in items:
101 thread_wakeup.wakeup()
102 for t, _ in items:
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100103 t.join()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000104
105# Controls how many more calls than processes will be queued in the call queue.
106# A smaller number will mean that processes spend more time idle waiting for
107# work while a larger number will make Future.cancel() succeed less frequently
108# (Futures in the call queue cannot be cancelled).
109EXTRA_QUEUED_CALLS = 1
110
Thomas Moreau94459fd2018-01-05 11:15:54 +0100111
Antoine Pitrou1285c9b2015-01-17 20:02:14 +0100112# Hack to embed stringification of remote traceback in local traceback
113
114class _RemoteTraceback(Exception):
115 def __init__(self, tb):
116 self.tb = tb
117 def __str__(self):
118 return self.tb
119
120class _ExceptionWithTraceback:
121 def __init__(self, exc, tb):
122 tb = traceback.format_exception(type(exc), exc, tb)
123 tb = ''.join(tb)
124 self.exc = exc
125 self.tb = '\n"""\n%s"""' % tb
126 def __reduce__(self):
127 return _rebuild_exc, (self.exc, self.tb)
128
129def _rebuild_exc(exc, tb):
130 exc.__cause__ = _RemoteTraceback(tb)
131 return exc
132
Brian Quinlan81c4d362010-09-18 22:35:02 +0000133class _WorkItem(object):
134 def __init__(self, future, fn, args, kwargs):
135 self.future = future
136 self.fn = fn
137 self.args = args
138 self.kwargs = kwargs
139
140class _ResultItem(object):
141 def __init__(self, work_id, exception=None, result=None):
142 self.work_id = work_id
143 self.exception = exception
144 self.result = result
145
146class _CallItem(object):
147 def __init__(self, work_id, fn, args, kwargs):
148 self.work_id = work_id
149 self.fn = fn
150 self.args = args
151 self.kwargs = kwargs
152
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100153
Thomas Moreau94459fd2018-01-05 11:15:54 +0100154class _SafeQueue(Queue):
155 """Safe Queue set exception to the future object linked to a job"""
156 def __init__(self, max_size=0, *, ctx, pending_work_items):
157 self.pending_work_items = pending_work_items
158 super().__init__(max_size, ctx=ctx)
159
160 def _on_queue_feeder_error(self, e, obj):
161 if isinstance(obj, _CallItem):
162 tb = traceback.format_exception(type(e), e, e.__traceback__)
163 e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
164 work_item = self.pending_work_items.pop(obj.work_id, None)
165 # work_item can be None if another process terminated. In this case,
166 # the queue_manager_thread fails all work_items with BrokenProcessPool
167 if work_item is not None:
168 work_item.future.set_exception(e)
169 else:
170 super()._on_queue_feeder_error(e, obj)
171
172
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200173def _get_chunks(*iterables, chunksize):
174 """ Iterates over zip()ed iterables in chunks. """
175 it = zip(*iterables)
176 while True:
177 chunk = tuple(itertools.islice(it, chunksize))
178 if not chunk:
179 return
180 yield chunk
181
182def _process_chunk(fn, chunk):
183 """ Processes a chunk of an iterable passed to map.
184
185 Runs the function passed to map() on a chunk of the
186 iterable passed to map.
187
188 This function is run in a separate process.
189
190 """
191 return [fn(*args) for args in chunk]
192
Thomas Moreau94459fd2018-01-05 11:15:54 +0100193
194def _sendback_result(result_queue, work_id, result=None, exception=None):
195 """Safely send back the given result or exception"""
196 try:
197 result_queue.put(_ResultItem(work_id, result=result,
198 exception=exception))
199 except BaseException as e:
200 exc = _ExceptionWithTraceback(e, e.__traceback__)
201 result_queue.put(_ResultItem(work_id, exception=exc))
202
203
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100204def _process_worker(call_queue, result_queue, initializer, initargs):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000205 """Evaluates calls from call_queue and places the results in result_queue.
206
Georg Brandlfb1720b2010-12-09 18:08:43 +0000207 This worker is run in a separate process.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000208
209 Args:
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200210 call_queue: A ctx.Queue of _CallItems that will be read and
Brian Quinlan81c4d362010-09-18 22:35:02 +0000211 evaluated by the worker.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200212 result_queue: A ctx.Queue of _ResultItems that will written
Brian Quinlan81c4d362010-09-18 22:35:02 +0000213 to by the worker.
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100214 initializer: A callable initializer, or None
215 initargs: A tuple of args for the initializer
Brian Quinlan81c4d362010-09-18 22:35:02 +0000216 """
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100217 if initializer is not None:
218 try:
219 initializer(*initargs)
220 except BaseException:
221 _base.LOGGER.critical('Exception in initializer:', exc_info=True)
222 # The parent will notice that the process stopped and
223 # mark the pool broken
224 return
Brian Quinlan81c4d362010-09-18 22:35:02 +0000225 while True:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200226 call_item = call_queue.get(block=True)
227 if call_item is None:
228 # Wake up queue management thread
Antoine Pitroudd696492011-06-08 17:21:55 +0200229 result_queue.put(os.getpid())
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200230 return
Brian Quinlan81c4d362010-09-18 22:35:02 +0000231 try:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200232 r = call_item.fn(*call_item.args, **call_item.kwargs)
233 except BaseException as e:
Antoine Pitrou1285c9b2015-01-17 20:02:14 +0100234 exc = _ExceptionWithTraceback(e, e.__traceback__)
Thomas Moreau94459fd2018-01-05 11:15:54 +0100235 _sendback_result(result_queue, call_item.work_id, exception=exc)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000236 else:
Thomas Moreau94459fd2018-01-05 11:15:54 +0100237 _sendback_result(result_queue, call_item.work_id, result=r)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000238
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200239 # Liberate the resource as soon as possible, to avoid holding onto
240 # open files or shared memory that is not needed anymore
241 del call_item
242
243
Brian Quinlan81c4d362010-09-18 22:35:02 +0000244def _add_call_item_to_queue(pending_work_items,
245 work_ids,
246 call_queue):
247 """Fills call_queue with _WorkItems from pending_work_items.
248
249 This function never blocks.
250
251 Args:
252 pending_work_items: A dict mapping work ids to _WorkItems e.g.
253 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
254 work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
255 are consumed and the corresponding _WorkItems from
256 pending_work_items are transformed into _CallItems and put in
257 call_queue.
258 call_queue: A multiprocessing.Queue that will be filled with _CallItems
259 derived from _WorkItems.
260 """
261 while True:
262 if call_queue.full():
263 return
264 try:
265 work_id = work_ids.get(block=False)
266 except queue.Empty:
267 return
268 else:
269 work_item = pending_work_items[work_id]
270
271 if work_item.future.set_running_or_notify_cancel():
272 call_queue.put(_CallItem(work_id,
273 work_item.fn,
274 work_item.args,
275 work_item.kwargs),
276 block=True)
277 else:
278 del pending_work_items[work_id]
279 continue
280
Thomas Moreau94459fd2018-01-05 11:15:54 +0100281
Antoine Pitroub87a56a2011-05-03 16:34:42 +0200282def _queue_management_worker(executor_reference,
283 processes,
284 pending_work_items,
285 work_ids_queue,
286 call_queue,
Thomas Moreau94459fd2018-01-05 11:15:54 +0100287 result_queue,
288 thread_wakeup):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000289 """Manages the communication between this process and the worker processes.
290
291 This function is run in a local thread.
292
293 Args:
294 executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
295 this thread. Used to determine if the ProcessPoolExecutor has been
296 garbage collected and that this function can exit.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200297 process: A list of the ctx.Process instances used as
Brian Quinlan81c4d362010-09-18 22:35:02 +0000298 workers.
299 pending_work_items: A dict mapping work ids to _WorkItems e.g.
300 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
301 work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200302 call_queue: A ctx.Queue that will be filled with _CallItems
Brian Quinlan81c4d362010-09-18 22:35:02 +0000303 derived from _WorkItems for processing by the process workers.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200304 result_queue: A ctx.SimpleQueue of _ResultItems generated by the
Brian Quinlan81c4d362010-09-18 22:35:02 +0000305 process workers.
Thomas Moreau94459fd2018-01-05 11:15:54 +0100306 thread_wakeup: A _ThreadWakeup to allow waking up the
307 queue_manager_thread from the main Thread and avoid deadlocks
308 caused by permanently locked queues.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000309 """
Antoine Pitrou020436b2011-07-02 21:20:25 +0200310 executor = None
311
312 def shutting_down():
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200313 return (_global_shutdown or executor is None
314 or executor._shutdown_thread)
Antoine Pitroudd696492011-06-08 17:21:55 +0200315
316 def shutdown_worker():
Thomas Moreau94459fd2018-01-05 11:15:54 +0100317 # This is an upper bound on the number of children alive.
318 n_children_alive = sum(p.is_alive() for p in processes.values())
319 n_children_to_stop = n_children_alive
320 n_sentinels_sent = 0
321 # Send the right number of sentinels, to make sure all children are
322 # properly terminated.
323 while n_sentinels_sent < n_children_to_stop and n_children_alive > 0:
324 for i in range(n_children_to_stop - n_sentinels_sent):
325 try:
326 call_queue.put_nowait(None)
327 n_sentinels_sent += 1
328 except Full:
329 break
330 n_children_alive = sum(p.is_alive() for p in processes.values())
331
Antoine Pitroudc19c242011-07-16 01:51:58 +0200332 # Release the queue's resources as soon as possible.
333 call_queue.close()
Antoine Pitroudd696492011-06-08 17:21:55 +0200334 # If .join() is not called on the created processes then
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200335 # some ctx.Queue methods may deadlock on Mac OS X.
Antoine Pitroudd696492011-06-08 17:21:55 +0200336 for p in processes.values():
337 p.join()
338
Thomas Moreau94459fd2018-01-05 11:15:54 +0100339 result_reader = result_queue._reader
340 wakeup_reader = thread_wakeup._reader
341 readers = [result_reader, wakeup_reader]
Antoine Pitroubdb1cf12012-03-05 19:28:37 +0100342
Brian Quinlan81c4d362010-09-18 22:35:02 +0000343 while True:
344 _add_call_item_to_queue(pending_work_items,
345 work_ids_queue,
346 call_queue)
347
Thomas Moreau94459fd2018-01-05 11:15:54 +0100348 # Wait for a result to be ready in the result_queue while checking
349 # that all worker processes are still running, or for a wake up
350 # signal send. The wake up signals come either from new tasks being
351 # submitted, from the executor being shutdown/gc-ed, or from the
352 # shutdown of the python interpreter.
353 worker_sentinels = [p.sentinel for p in processes.values()]
354 ready = wait(readers + worker_sentinels)
355
356 cause = None
357 is_broken = True
358 if result_reader in ready:
359 try:
360 result_item = result_reader.recv()
361 is_broken = False
362 except BaseException as e:
363 cause = traceback.format_exception(type(e), e, e.__traceback__)
364
365 elif wakeup_reader in ready:
366 is_broken = False
367 result_item = None
368 thread_wakeup.clear()
369 if is_broken:
Antoine Pitroudd696492011-06-08 17:21:55 +0200370 # Mark the process pool broken so that submits fail right now.
371 executor = executor_reference()
372 if executor is not None:
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100373 executor._broken = ('A child process terminated '
374 'abruptly, the process pool is not '
375 'usable anymore')
Antoine Pitroudd696492011-06-08 17:21:55 +0200376 executor._shutdown_thread = True
Antoine Pitrou020436b2011-07-02 21:20:25 +0200377 executor = None
Thomas Moreau94459fd2018-01-05 11:15:54 +0100378 bpe = BrokenProcessPool("A process in the process pool was "
379 "terminated abruptly while the future was "
380 "running or pending.")
381 if cause is not None:
382 bpe.__cause__ = _RemoteTraceback(
383 f"\n'''\n{''.join(cause)}'''")
Antoine Pitroudd696492011-06-08 17:21:55 +0200384 # All futures in flight must be marked failed
385 for work_id, work_item in pending_work_items.items():
Thomas Moreau94459fd2018-01-05 11:15:54 +0100386 work_item.future.set_exception(bpe)
Andrew Svetlov6b973742012-11-03 15:36:01 +0200387 # Delete references to object. See issue16284
388 del work_item
Antoine Pitroudd696492011-06-08 17:21:55 +0200389 pending_work_items.clear()
390 # Terminate remaining workers forcibly: the queues or their
391 # locks may be in a dirty state and block forever.
392 for p in processes.values():
393 p.terminate()
Antoine Pitroudc19c242011-07-16 01:51:58 +0200394 shutdown_worker()
Antoine Pitroudd696492011-06-08 17:21:55 +0200395 return
396 if isinstance(result_item, int):
397 # Clean shutdown of a worker using its PID
398 # (avoids marking the executor broken)
Antoine Pitrou020436b2011-07-02 21:20:25 +0200399 assert shutting_down()
Antoine Pitroud06a0652011-07-16 01:13:34 +0200400 p = processes.pop(result_item)
401 p.join()
Antoine Pitrou020436b2011-07-02 21:20:25 +0200402 if not processes:
403 shutdown_worker()
404 return
Antoine Pitroudd696492011-06-08 17:21:55 +0200405 elif result_item is not None:
406 work_item = pending_work_items.pop(result_item.work_id, None)
407 # work_item can be None if another process terminated (see above)
408 if work_item is not None:
409 if result_item.exception:
410 work_item.future.set_exception(result_item.exception)
411 else:
412 work_item.future.set_result(result_item.result)
Andrew Svetlov6b973742012-11-03 15:36:01 +0200413 # Delete references to object. See issue16284
414 del work_item
Thomas Moreau94459fd2018-01-05 11:15:54 +0100415 # Delete reference to result_item
416 del result_item
417
Antoine Pitroudd696492011-06-08 17:21:55 +0200418 # Check whether we should start shutting down.
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100419 executor = executor_reference()
420 # No more work items can be added if:
421 # - The interpreter is shutting down OR
422 # - The executor that owns this worker has been collected OR
423 # - The executor that owns this worker has been shutdown.
Antoine Pitrou020436b2011-07-02 21:20:25 +0200424 if shutting_down():
Antoine Pitrou020436b2011-07-02 21:20:25 +0200425 try:
Antoine Pitrou1c405b32011-07-03 13:17:06 +0200426 # Since no new work items can be added, it is safe to shutdown
427 # this thread if there are no pending work items.
428 if not pending_work_items:
429 shutdown_worker()
430 return
Antoine Pitrou020436b2011-07-02 21:20:25 +0200431 except Full:
432 # This is not a problem: we will eventually be woken up (in
Antoine Pitrou1c405b32011-07-03 13:17:06 +0200433 # result_queue.get()) and be able to send a sentinel again.
Antoine Pitrou020436b2011-07-02 21:20:25 +0200434 pass
435 executor = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000436
Thomas Moreau94459fd2018-01-05 11:15:54 +0100437
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000438_system_limits_checked = False
439_system_limited = None
Thomas Moreau94459fd2018-01-05 11:15:54 +0100440
441
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000442def _check_system_limits():
443 global _system_limits_checked, _system_limited
444 if _system_limits_checked:
445 if _system_limited:
446 raise NotImplementedError(_system_limited)
447 _system_limits_checked = True
448 try:
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000449 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
450 except (AttributeError, ValueError):
451 # sysconf not available or setting not available
452 return
453 if nsems_max == -1:
Ezio Melottib5bc3532013-08-17 16:11:40 +0300454 # indetermined limit, assume that limit is determined
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000455 # by available memory only
456 return
457 if nsems_max >= 256:
458 # minimum number of semaphores available
459 # according to POSIX
460 return
Thomas Moreau94459fd2018-01-05 11:15:54 +0100461 _system_limited = ("system provides too few semaphores (%d"
462 " available, 256 necessary)" % nsems_max)
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000463 raise NotImplementedError(_system_limited)
464
Antoine Pitroudd696492011-06-08 17:21:55 +0200465
Grzegorz Grzywacz97e1b1c2017-09-01 18:54:00 +0200466def _chain_from_iterable_of_lists(iterable):
467 """
468 Specialized implementation of itertools.chain.from_iterable.
469 Each item in *iterable* should be a list. This function is
470 careful not to keep references to yielded objects.
471 """
472 for element in iterable:
473 element.reverse()
474 while element:
475 yield element.pop()
476
477
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100478class BrokenProcessPool(_base.BrokenExecutor):
Antoine Pitroudd696492011-06-08 17:21:55 +0200479 """
480 Raised when a process in a ProcessPoolExecutor terminated abruptly
481 while a future was in the running state.
482 """
483
484
Brian Quinlan81c4d362010-09-18 22:35:02 +0000485class ProcessPoolExecutor(_base.Executor):
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100486 def __init__(self, max_workers=None, mp_context=None,
487 initializer=None, initargs=()):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000488 """Initializes a new ProcessPoolExecutor instance.
489
490 Args:
491 max_workers: The maximum number of processes that can be used to
492 execute the given calls. If None or not given then as many
493 worker processes will be created as the machine has processors.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200494 mp_context: A multiprocessing context to launch the workers. This
495 object should provide SimpleQueue, Queue and Process.
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100496 initializer: An callable used to initialize worker processes.
497 initargs: A tuple of arguments to pass to the initializer.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000498 """
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000499 _check_system_limits()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000500
501 if max_workers is None:
Charles-François Natali37cfb0a2013-06-28 19:25:45 +0200502 self._max_workers = os.cpu_count() or 1
Brian Quinlan81c4d362010-09-18 22:35:02 +0000503 else:
Brian Quinlan20efceb2014-05-17 13:51:10 -0700504 if max_workers <= 0:
505 raise ValueError("max_workers must be greater than 0")
506
Brian Quinlan81c4d362010-09-18 22:35:02 +0000507 self._max_workers = max_workers
Thomas Moreau94459fd2018-01-05 11:15:54 +0100508
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200509 if mp_context is None:
510 mp_context = mp.get_context()
511 self._mp_context = mp_context
Brian Quinlan81c4d362010-09-18 22:35:02 +0000512
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100513 if initializer is not None and not callable(initializer):
514 raise TypeError("initializer must be a callable")
515 self._initializer = initializer
516 self._initargs = initargs
517
Thomas Moreau94459fd2018-01-05 11:15:54 +0100518 # Management thread
Brian Quinlan81c4d362010-09-18 22:35:02 +0000519 self._queue_management_thread = None
Thomas Moreau94459fd2018-01-05 11:15:54 +0100520
Antoine Pitroudd696492011-06-08 17:21:55 +0200521 # Map of pids to processes
522 self._processes = {}
Brian Quinlan81c4d362010-09-18 22:35:02 +0000523
524 # Shutdown is a two-step process.
525 self._shutdown_thread = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000526 self._shutdown_lock = threading.Lock()
Antoine Pitroudd696492011-06-08 17:21:55 +0200527 self._broken = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000528 self._queue_count = 0
529 self._pending_work_items = {}
530
Thomas Moreau94459fd2018-01-05 11:15:54 +0100531 # Create communication channels for the executor
532 # Make the call queue slightly larger than the number of processes to
533 # prevent the worker processes from idling. But don't make it too big
534 # because futures in the call queue cannot be cancelled.
535 queue_size = self._max_workers + EXTRA_QUEUED_CALLS
536 self._call_queue = _SafeQueue(
537 max_size=queue_size, ctx=self._mp_context,
538 pending_work_items=self._pending_work_items)
539 # Killed worker processes can produce spurious "broken pipe"
540 # tracebacks in the queue's own worker thread. But we detect killed
541 # processes anyway, so silence the tracebacks.
542 self._call_queue._ignore_epipe = True
543 self._result_queue = mp_context.SimpleQueue()
544 self._work_ids = queue.Queue()
545
546 # _ThreadWakeup is a communication channel used to interrupt the wait
547 # of the main loop of queue_manager_thread from another thread (e.g.
548 # when calling executor.submit or executor.shutdown). We do not use the
549 # _result_queue to send the wakeup signal to the queue_manager_thread
550 # as it could result in a deadlock if a worker process dies with the
551 # _result_queue write lock still acquired.
552 self._queue_management_thread_wakeup = _ThreadWakeup()
553
Brian Quinlan81c4d362010-09-18 22:35:02 +0000554 def _start_queue_management_thread(self):
555 if self._queue_management_thread is None:
Thomas Moreau94459fd2018-01-05 11:15:54 +0100556 # When the executor gets garbarge collected, the weakref callback
557 # will wake up the queue management thread so that it can terminate
558 # if there is no pending work item.
559 def weakref_cb(_,
560 thread_wakeup=self._queue_management_thread_wakeup):
561 mp.util.debug('Executor collected: triggering callback for'
562 ' QueueManager wakeup')
563 thread_wakeup.wakeup()
Antoine Pitroudd696492011-06-08 17:21:55 +0200564 # Start the processes so that their sentinels are known.
565 self._adjust_process_count()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000566 self._queue_management_thread = threading.Thread(
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200567 target=_queue_management_worker,
568 args=(weakref.ref(self, weakref_cb),
569 self._processes,
570 self._pending_work_items,
571 self._work_ids,
572 self._call_queue,
Thomas Moreau94459fd2018-01-05 11:15:54 +0100573 self._result_queue,
574 self._queue_management_thread_wakeup),
575 name="QueueManagerThread")
Brian Quinlan81c4d362010-09-18 22:35:02 +0000576 self._queue_management_thread.daemon = True
577 self._queue_management_thread.start()
Thomas Moreau94459fd2018-01-05 11:15:54 +0100578 _threads_wakeups[self._queue_management_thread] = \
579 self._queue_management_thread_wakeup
Brian Quinlan81c4d362010-09-18 22:35:02 +0000580
581 def _adjust_process_count(self):
582 for _ in range(len(self._processes), self._max_workers):
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200583 p = self._mp_context.Process(
584 target=_process_worker,
585 args=(self._call_queue,
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100586 self._result_queue,
587 self._initializer,
588 self._initargs))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000589 p.start()
Antoine Pitroudd696492011-06-08 17:21:55 +0200590 self._processes[p.pid] = p
Brian Quinlan81c4d362010-09-18 22:35:02 +0000591
592 def submit(self, fn, *args, **kwargs):
593 with self._shutdown_lock:
Antoine Pitroudd696492011-06-08 17:21:55 +0200594 if self._broken:
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100595 raise BrokenProcessPool(self._broken)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000596 if self._shutdown_thread:
597 raise RuntimeError('cannot schedule new futures after shutdown')
598
599 f = _base.Future()
600 w = _WorkItem(f, fn, args, kwargs)
601
602 self._pending_work_items[self._queue_count] = w
603 self._work_ids.put(self._queue_count)
604 self._queue_count += 1
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100605 # Wake up queue management thread
Thomas Moreau94459fd2018-01-05 11:15:54 +0100606 self._queue_management_thread_wakeup.wakeup()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000607
608 self._start_queue_management_thread()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000609 return f
610 submit.__doc__ = _base.Executor.submit.__doc__
611
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200612 def map(self, fn, *iterables, timeout=None, chunksize=1):
Martin Panterd2ad5712015-11-02 04:20:33 +0000613 """Returns an iterator equivalent to map(fn, iter).
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200614
615 Args:
616 fn: A callable that will take as many arguments as there are
617 passed iterables.
618 timeout: The maximum number of seconds to wait. If None, then there
619 is no limit on the wait time.
620 chunksize: If greater than one, the iterables will be chopped into
621 chunks of size chunksize and submitted to the process pool.
622 If set to one, the items in the list will be sent one at a time.
623
624 Returns:
625 An iterator equivalent to: map(func, *iterables) but the calls may
626 be evaluated out-of-order.
627
628 Raises:
629 TimeoutError: If the entire result iterator could not be generated
630 before the given timeout.
631 Exception: If fn(*args) raises for any values.
632 """
633 if chunksize < 1:
634 raise ValueError("chunksize must be >= 1.")
635
636 results = super().map(partial(_process_chunk, fn),
637 _get_chunks(*iterables, chunksize=chunksize),
638 timeout=timeout)
Grzegorz Grzywacz97e1b1c2017-09-01 18:54:00 +0200639 return _chain_from_iterable_of_lists(results)
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200640
Brian Quinlan81c4d362010-09-18 22:35:02 +0000641 def shutdown(self, wait=True):
642 with self._shutdown_lock:
643 self._shutdown_thread = True
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100644 if self._queue_management_thread:
645 # Wake up queue management thread
Thomas Moreau94459fd2018-01-05 11:15:54 +0100646 self._queue_management_thread_wakeup.wakeup()
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100647 if wait:
Brian Quinlan81c4d362010-09-18 22:35:02 +0000648 self._queue_management_thread.join()
Ezio Melottib5bc3532013-08-17 16:11:40 +0300649 # To reduce the risk of opening too many files, remove references to
Brian Quinlan81c4d362010-09-18 22:35:02 +0000650 # objects that use file descriptors.
651 self._queue_management_thread = None
Victor Stinnerb713adf2017-09-02 00:25:11 +0200652 if self._call_queue is not None:
653 self._call_queue.close()
654 if wait:
655 self._call_queue.join_thread()
656 self._call_queue = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000657 self._result_queue = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000658 self._processes = None
Thomas Moreau095ee412018-03-12 18:18:41 +0100659
660 if self._queue_management_thread_wakeup:
661 self._queue_management_thread_wakeup.close()
662 self._queue_management_thread_wakeup = None
663
Brian Quinlan81c4d362010-09-18 22:35:02 +0000664 shutdown.__doc__ = _base.Executor.shutdown.__doc__
665
666atexit.register(_python_exit)