blob: 306e9ce47a6b875afe06bf5f0ea9477c60a32e27 [file] [log] [blame]
Brian Quinlan81c4d362010-09-18 22:35:02 +00001# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4"""Implements ProcessPoolExecutor.
5
6The follow diagram and text describe the data-flow through the system:
7
8|======================= In-process =====================|== Out-of-process ==|
9
10+----------+ +----------+ +--------+ +-----------+ +---------+
Thomas Moreau94459fd2018-01-05 11:15:54 +010011| | => | Work Ids | | | | Call Q | | Process |
12| | +----------+ | | +-----------+ | Pool |
13| | | ... | | | | ... | +---------+
14| | | 6 | => | | => | 5, call() | => | |
Brian Quinlan81c4d362010-09-18 22:35:02 +000015| | | 7 | | | | ... | | |
16| Process | | ... | | Local | +-----------+ | Process |
17| Pool | +----------+ | Worker | | #1..n |
18| Executor | | Thread | | |
19| | +----------- + | | +-----------+ | |
20| | <=> | Work Items | <=> | | <= | Result Q | <= | |
21| | +------------+ | | +-----------+ | |
22| | | 6: call() | | | | ... | | |
23| | | future | | | | 4, result | | |
24| | | ... | | | | 3, except | | |
25+----------+ +------------+ +--------+ +-----------+ +---------+
26
27Executor.submit() called:
28- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29- adds the id of the _WorkItem to the "Work Ids" queue
30
31Local worker thread:
32- reads work ids from the "Work Ids" queue and looks up the corresponding
33 WorkItem from the "Work Items" dict: if the work item has been cancelled then
34 it is simply removed from the dict, otherwise it is repackaged as a
35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38- reads _ResultItems from "Result Q", updates the future stored in the
39 "Work Items" dict and deletes the dict entry
40
41Process #1..n:
42- reads _CallItems from "Call Q", executes the calls, and puts the resulting
Mark Dickinson5ee24042012-10-20 13:16:49 +010043 _ResultItems in "Result Q"
Brian Quinlan81c4d362010-09-18 22:35:02 +000044"""
45
46__author__ = 'Brian Quinlan (brian@sweetapp.com)'
47
48import atexit
Antoine Pitroudd696492011-06-08 17:21:55 +020049import os
Brian Quinlan81c4d362010-09-18 22:35:02 +000050from concurrent.futures import _base
51import queue
Richard Oudkerk1f2eaa92013-10-16 17:06:22 +010052from queue import Full
Thomas Moreaue8c368d2017-10-03 11:53:17 +020053import multiprocessing as mp
Antoine Pitroubdb1cf12012-03-05 19:28:37 +010054from multiprocessing.connection import wait
Thomas Moreau94459fd2018-01-05 11:15:54 +010055from multiprocessing.queues import Queue
Brian Quinlan81c4d362010-09-18 22:35:02 +000056import threading
57import weakref
Antoine Pitrou4aae2762014-10-04 20:20:10 +020058from functools import partial
59import itertools
Antoine Pitrou1285c9b2015-01-17 20:02:14 +010060import traceback
Brian Quinlan81c4d362010-09-18 22:35:02 +000061
62# Workers are created as daemon threads and processes. This is done to allow the
63# interpreter to exit when there are still idle processes in a
64# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
65# allowing workers to die with the interpreter has two undesirable properties:
Raymond Hettinger15f44ab2016-08-30 10:47:49 -070066# - The workers would still be running during interpreter shutdown,
Brian Quinlan81c4d362010-09-18 22:35:02 +000067# meaning that they would fail in unpredictable ways.
68# - The workers could be killed while evaluating a work item, which could
69# be bad if the callable being evaluated has external side-effects e.g.
70# writing to a file.
71#
72# To work around this problem, an exit handler is installed which tells the
73# workers to exit when their work queues are empty and then waits until the
74# threads/processes finish.
75
Thomas Moreau94459fd2018-01-05 11:15:54 +010076_threads_wakeups = weakref.WeakKeyDictionary()
Thomas Moreaue8c368d2017-10-03 11:53:17 +020077_global_shutdown = False
Brian Quinlan81c4d362010-09-18 22:35:02 +000078
Thomas Moreau94459fd2018-01-05 11:15:54 +010079
80class _ThreadWakeup:
Thomas Moreau94459fd2018-01-05 11:15:54 +010081 def __init__(self):
82 self._reader, self._writer = mp.Pipe(duplex=False)
83
Thomas Moreau095ee412018-03-12 18:18:41 +010084 def close(self):
85 self._writer.close()
86 self._reader.close()
87
Thomas Moreau94459fd2018-01-05 11:15:54 +010088 def wakeup(self):
89 self._writer.send_bytes(b"")
90
91 def clear(self):
92 while self._reader.poll():
93 self._reader.recv_bytes()
94
95
Brian Quinlan81c4d362010-09-18 22:35:02 +000096def _python_exit():
Thomas Moreaue8c368d2017-10-03 11:53:17 +020097 global _global_shutdown
98 _global_shutdown = True
Thomas Moreau94459fd2018-01-05 11:15:54 +010099 items = list(_threads_wakeups.items())
100 for _, thread_wakeup in items:
101 thread_wakeup.wakeup()
102 for t, _ in items:
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100103 t.join()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000104
105# Controls how many more calls than processes will be queued in the call queue.
106# A smaller number will mean that processes spend more time idle waiting for
107# work while a larger number will make Future.cancel() succeed less frequently
108# (Futures in the call queue cannot be cancelled).
109EXTRA_QUEUED_CALLS = 1
110
Thomas Moreau94459fd2018-01-05 11:15:54 +0100111
Antoine Pitrou1285c9b2015-01-17 20:02:14 +0100112# Hack to embed stringification of remote traceback in local traceback
113
114class _RemoteTraceback(Exception):
115 def __init__(self, tb):
116 self.tb = tb
117 def __str__(self):
118 return self.tb
119
120class _ExceptionWithTraceback:
121 def __init__(self, exc, tb):
122 tb = traceback.format_exception(type(exc), exc, tb)
123 tb = ''.join(tb)
124 self.exc = exc
125 self.tb = '\n"""\n%s"""' % tb
126 def __reduce__(self):
127 return _rebuild_exc, (self.exc, self.tb)
128
129def _rebuild_exc(exc, tb):
130 exc.__cause__ = _RemoteTraceback(tb)
131 return exc
132
Brian Quinlan81c4d362010-09-18 22:35:02 +0000133class _WorkItem(object):
134 def __init__(self, future, fn, args, kwargs):
135 self.future = future
136 self.fn = fn
137 self.args = args
138 self.kwargs = kwargs
139
140class _ResultItem(object):
141 def __init__(self, work_id, exception=None, result=None):
142 self.work_id = work_id
143 self.exception = exception
144 self.result = result
145
146class _CallItem(object):
147 def __init__(self, work_id, fn, args, kwargs):
148 self.work_id = work_id
149 self.fn = fn
150 self.args = args
151 self.kwargs = kwargs
152
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100153
Thomas Moreau94459fd2018-01-05 11:15:54 +0100154class _SafeQueue(Queue):
155 """Safe Queue set exception to the future object linked to a job"""
156 def __init__(self, max_size=0, *, ctx, pending_work_items):
157 self.pending_work_items = pending_work_items
158 super().__init__(max_size, ctx=ctx)
159
160 def _on_queue_feeder_error(self, e, obj):
161 if isinstance(obj, _CallItem):
162 tb = traceback.format_exception(type(e), e, e.__traceback__)
163 e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
164 work_item = self.pending_work_items.pop(obj.work_id, None)
165 # work_item can be None if another process terminated. In this case,
166 # the queue_manager_thread fails all work_items with BrokenProcessPool
167 if work_item is not None:
168 work_item.future.set_exception(e)
169 else:
170 super()._on_queue_feeder_error(e, obj)
171
172
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200173def _get_chunks(*iterables, chunksize):
174 """ Iterates over zip()ed iterables in chunks. """
175 it = zip(*iterables)
176 while True:
177 chunk = tuple(itertools.islice(it, chunksize))
178 if not chunk:
179 return
180 yield chunk
181
182def _process_chunk(fn, chunk):
183 """ Processes a chunk of an iterable passed to map.
184
185 Runs the function passed to map() on a chunk of the
186 iterable passed to map.
187
188 This function is run in a separate process.
189
190 """
191 return [fn(*args) for args in chunk]
192
Thomas Moreau94459fd2018-01-05 11:15:54 +0100193
194def _sendback_result(result_queue, work_id, result=None, exception=None):
195 """Safely send back the given result or exception"""
196 try:
197 result_queue.put(_ResultItem(work_id, result=result,
198 exception=exception))
199 except BaseException as e:
200 exc = _ExceptionWithTraceback(e, e.__traceback__)
201 result_queue.put(_ResultItem(work_id, exception=exc))
202
203
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100204def _process_worker(call_queue, result_queue, initializer, initargs):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000205 """Evaluates calls from call_queue and places the results in result_queue.
206
Georg Brandlfb1720b2010-12-09 18:08:43 +0000207 This worker is run in a separate process.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000208
209 Args:
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200210 call_queue: A ctx.Queue of _CallItems that will be read and
Brian Quinlan81c4d362010-09-18 22:35:02 +0000211 evaluated by the worker.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200212 result_queue: A ctx.Queue of _ResultItems that will written
Brian Quinlan81c4d362010-09-18 22:35:02 +0000213 to by the worker.
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100214 initializer: A callable initializer, or None
215 initargs: A tuple of args for the initializer
Brian Quinlan81c4d362010-09-18 22:35:02 +0000216 """
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100217 if initializer is not None:
218 try:
219 initializer(*initargs)
220 except BaseException:
221 _base.LOGGER.critical('Exception in initializer:', exc_info=True)
222 # The parent will notice that the process stopped and
223 # mark the pool broken
224 return
Brian Quinlan81c4d362010-09-18 22:35:02 +0000225 while True:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200226 call_item = call_queue.get(block=True)
227 if call_item is None:
228 # Wake up queue management thread
Antoine Pitroudd696492011-06-08 17:21:55 +0200229 result_queue.put(os.getpid())
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200230 return
Brian Quinlan81c4d362010-09-18 22:35:02 +0000231 try:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200232 r = call_item.fn(*call_item.args, **call_item.kwargs)
233 except BaseException as e:
Antoine Pitrou1285c9b2015-01-17 20:02:14 +0100234 exc = _ExceptionWithTraceback(e, e.__traceback__)
Thomas Moreau94459fd2018-01-05 11:15:54 +0100235 _sendback_result(result_queue, call_item.work_id, exception=exc)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000236 else:
Thomas Moreau94459fd2018-01-05 11:15:54 +0100237 _sendback_result(result_queue, call_item.work_id, result=r)
Dave Chevell962bdea2019-03-17 09:28:51 +1100238 del r
Brian Quinlan81c4d362010-09-18 22:35:02 +0000239
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200240 # Liberate the resource as soon as possible, to avoid holding onto
241 # open files or shared memory that is not needed anymore
242 del call_item
243
244
Brian Quinlan81c4d362010-09-18 22:35:02 +0000245def _add_call_item_to_queue(pending_work_items,
246 work_ids,
247 call_queue):
248 """Fills call_queue with _WorkItems from pending_work_items.
249
250 This function never blocks.
251
252 Args:
253 pending_work_items: A dict mapping work ids to _WorkItems e.g.
254 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
255 work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
256 are consumed and the corresponding _WorkItems from
257 pending_work_items are transformed into _CallItems and put in
258 call_queue.
259 call_queue: A multiprocessing.Queue that will be filled with _CallItems
260 derived from _WorkItems.
261 """
262 while True:
263 if call_queue.full():
264 return
265 try:
266 work_id = work_ids.get(block=False)
267 except queue.Empty:
268 return
269 else:
270 work_item = pending_work_items[work_id]
271
272 if work_item.future.set_running_or_notify_cancel():
273 call_queue.put(_CallItem(work_id,
274 work_item.fn,
275 work_item.args,
276 work_item.kwargs),
277 block=True)
278 else:
279 del pending_work_items[work_id]
280 continue
281
Thomas Moreau94459fd2018-01-05 11:15:54 +0100282
Antoine Pitroub87a56a2011-05-03 16:34:42 +0200283def _queue_management_worker(executor_reference,
284 processes,
285 pending_work_items,
286 work_ids_queue,
287 call_queue,
Thomas Moreau94459fd2018-01-05 11:15:54 +0100288 result_queue,
289 thread_wakeup):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000290 """Manages the communication between this process and the worker processes.
291
292 This function is run in a local thread.
293
294 Args:
295 executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
296 this thread. Used to determine if the ProcessPoolExecutor has been
297 garbage collected and that this function can exit.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200298 process: A list of the ctx.Process instances used as
Brian Quinlan81c4d362010-09-18 22:35:02 +0000299 workers.
300 pending_work_items: A dict mapping work ids to _WorkItems e.g.
301 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
302 work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200303 call_queue: A ctx.Queue that will be filled with _CallItems
Brian Quinlan81c4d362010-09-18 22:35:02 +0000304 derived from _WorkItems for processing by the process workers.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200305 result_queue: A ctx.SimpleQueue of _ResultItems generated by the
Brian Quinlan81c4d362010-09-18 22:35:02 +0000306 process workers.
Thomas Moreau94459fd2018-01-05 11:15:54 +0100307 thread_wakeup: A _ThreadWakeup to allow waking up the
308 queue_manager_thread from the main Thread and avoid deadlocks
309 caused by permanently locked queues.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000310 """
Antoine Pitrou020436b2011-07-02 21:20:25 +0200311 executor = None
312
313 def shutting_down():
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200314 return (_global_shutdown or executor is None
315 or executor._shutdown_thread)
Antoine Pitroudd696492011-06-08 17:21:55 +0200316
317 def shutdown_worker():
Thomas Moreau94459fd2018-01-05 11:15:54 +0100318 # This is an upper bound on the number of children alive.
319 n_children_alive = sum(p.is_alive() for p in processes.values())
320 n_children_to_stop = n_children_alive
321 n_sentinels_sent = 0
322 # Send the right number of sentinels, to make sure all children are
323 # properly terminated.
324 while n_sentinels_sent < n_children_to_stop and n_children_alive > 0:
325 for i in range(n_children_to_stop - n_sentinels_sent):
326 try:
327 call_queue.put_nowait(None)
328 n_sentinels_sent += 1
329 except Full:
330 break
331 n_children_alive = sum(p.is_alive() for p in processes.values())
332
Antoine Pitroudc19c242011-07-16 01:51:58 +0200333 # Release the queue's resources as soon as possible.
334 call_queue.close()
Antoine Pitroudd696492011-06-08 17:21:55 +0200335 # If .join() is not called on the created processes then
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200336 # some ctx.Queue methods may deadlock on Mac OS X.
Antoine Pitroudd696492011-06-08 17:21:55 +0200337 for p in processes.values():
338 p.join()
339
Thomas Moreau94459fd2018-01-05 11:15:54 +0100340 result_reader = result_queue._reader
341 wakeup_reader = thread_wakeup._reader
342 readers = [result_reader, wakeup_reader]
Antoine Pitroubdb1cf12012-03-05 19:28:37 +0100343
Brian Quinlan81c4d362010-09-18 22:35:02 +0000344 while True:
345 _add_call_item_to_queue(pending_work_items,
346 work_ids_queue,
347 call_queue)
348
Thomas Moreau94459fd2018-01-05 11:15:54 +0100349 # Wait for a result to be ready in the result_queue while checking
350 # that all worker processes are still running, or for a wake up
351 # signal send. The wake up signals come either from new tasks being
352 # submitted, from the executor being shutdown/gc-ed, or from the
353 # shutdown of the python interpreter.
354 worker_sentinels = [p.sentinel for p in processes.values()]
355 ready = wait(readers + worker_sentinels)
356
357 cause = None
358 is_broken = True
359 if result_reader in ready:
360 try:
361 result_item = result_reader.recv()
362 is_broken = False
363 except BaseException as e:
364 cause = traceback.format_exception(type(e), e, e.__traceback__)
365
366 elif wakeup_reader in ready:
367 is_broken = False
368 result_item = None
369 thread_wakeup.clear()
370 if is_broken:
Antoine Pitroudd696492011-06-08 17:21:55 +0200371 # Mark the process pool broken so that submits fail right now.
372 executor = executor_reference()
373 if executor is not None:
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100374 executor._broken = ('A child process terminated '
375 'abruptly, the process pool is not '
376 'usable anymore')
Antoine Pitroudd696492011-06-08 17:21:55 +0200377 executor._shutdown_thread = True
Antoine Pitrou020436b2011-07-02 21:20:25 +0200378 executor = None
Thomas Moreau94459fd2018-01-05 11:15:54 +0100379 bpe = BrokenProcessPool("A process in the process pool was "
380 "terminated abruptly while the future was "
381 "running or pending.")
382 if cause is not None:
383 bpe.__cause__ = _RemoteTraceback(
384 f"\n'''\n{''.join(cause)}'''")
Antoine Pitroudd696492011-06-08 17:21:55 +0200385 # All futures in flight must be marked failed
386 for work_id, work_item in pending_work_items.items():
Thomas Moreau94459fd2018-01-05 11:15:54 +0100387 work_item.future.set_exception(bpe)
Andrew Svetlov6b973742012-11-03 15:36:01 +0200388 # Delete references to object. See issue16284
389 del work_item
Antoine Pitroudd696492011-06-08 17:21:55 +0200390 pending_work_items.clear()
391 # Terminate remaining workers forcibly: the queues or their
392 # locks may be in a dirty state and block forever.
393 for p in processes.values():
394 p.terminate()
Antoine Pitroudc19c242011-07-16 01:51:58 +0200395 shutdown_worker()
Antoine Pitroudd696492011-06-08 17:21:55 +0200396 return
397 if isinstance(result_item, int):
398 # Clean shutdown of a worker using its PID
399 # (avoids marking the executor broken)
Antoine Pitrou020436b2011-07-02 21:20:25 +0200400 assert shutting_down()
Antoine Pitroud06a0652011-07-16 01:13:34 +0200401 p = processes.pop(result_item)
402 p.join()
Antoine Pitrou020436b2011-07-02 21:20:25 +0200403 if not processes:
404 shutdown_worker()
405 return
Antoine Pitroudd696492011-06-08 17:21:55 +0200406 elif result_item is not None:
407 work_item = pending_work_items.pop(result_item.work_id, None)
408 # work_item can be None if another process terminated (see above)
409 if work_item is not None:
410 if result_item.exception:
411 work_item.future.set_exception(result_item.exception)
412 else:
413 work_item.future.set_result(result_item.result)
Andrew Svetlov6b973742012-11-03 15:36:01 +0200414 # Delete references to object. See issue16284
415 del work_item
Thomas Moreau94459fd2018-01-05 11:15:54 +0100416 # Delete reference to result_item
417 del result_item
418
Antoine Pitroudd696492011-06-08 17:21:55 +0200419 # Check whether we should start shutting down.
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100420 executor = executor_reference()
421 # No more work items can be added if:
422 # - The interpreter is shutting down OR
423 # - The executor that owns this worker has been collected OR
424 # - The executor that owns this worker has been shutdown.
Antoine Pitrou020436b2011-07-02 21:20:25 +0200425 if shutting_down():
Antoine Pitrou020436b2011-07-02 21:20:25 +0200426 try:
Mark Nemecc4b695f2018-04-10 18:23:14 +0100427 # Flag the executor as shutting down as early as possible if it
428 # is not gc-ed yet.
429 if executor is not None:
430 executor._shutdown_thread = True
Antoine Pitrou1c405b32011-07-03 13:17:06 +0200431 # Since no new work items can be added, it is safe to shutdown
432 # this thread if there are no pending work items.
433 if not pending_work_items:
434 shutdown_worker()
435 return
Antoine Pitrou020436b2011-07-02 21:20:25 +0200436 except Full:
437 # This is not a problem: we will eventually be woken up (in
Antoine Pitrou1c405b32011-07-03 13:17:06 +0200438 # result_queue.get()) and be able to send a sentinel again.
Antoine Pitrou020436b2011-07-02 21:20:25 +0200439 pass
440 executor = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000441
Thomas Moreau94459fd2018-01-05 11:15:54 +0100442
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000443_system_limits_checked = False
444_system_limited = None
Thomas Moreau94459fd2018-01-05 11:15:54 +0100445
446
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000447def _check_system_limits():
448 global _system_limits_checked, _system_limited
449 if _system_limits_checked:
450 if _system_limited:
451 raise NotImplementedError(_system_limited)
452 _system_limits_checked = True
453 try:
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000454 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
455 except (AttributeError, ValueError):
456 # sysconf not available or setting not available
457 return
458 if nsems_max == -1:
Ezio Melottib5bc3532013-08-17 16:11:40 +0300459 # indetermined limit, assume that limit is determined
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000460 # by available memory only
461 return
462 if nsems_max >= 256:
463 # minimum number of semaphores available
464 # according to POSIX
465 return
Thomas Moreau94459fd2018-01-05 11:15:54 +0100466 _system_limited = ("system provides too few semaphores (%d"
467 " available, 256 necessary)" % nsems_max)
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000468 raise NotImplementedError(_system_limited)
469
Antoine Pitroudd696492011-06-08 17:21:55 +0200470
Grzegorz Grzywacz97e1b1c2017-09-01 18:54:00 +0200471def _chain_from_iterable_of_lists(iterable):
472 """
473 Specialized implementation of itertools.chain.from_iterable.
474 Each item in *iterable* should be a list. This function is
475 careful not to keep references to yielded objects.
476 """
477 for element in iterable:
478 element.reverse()
479 while element:
480 yield element.pop()
481
482
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100483class BrokenProcessPool(_base.BrokenExecutor):
Antoine Pitroudd696492011-06-08 17:21:55 +0200484 """
485 Raised when a process in a ProcessPoolExecutor terminated abruptly
486 while a future was in the running state.
487 """
488
489
Brian Quinlan81c4d362010-09-18 22:35:02 +0000490class ProcessPoolExecutor(_base.Executor):
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100491 def __init__(self, max_workers=None, mp_context=None,
492 initializer=None, initargs=()):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000493 """Initializes a new ProcessPoolExecutor instance.
494
495 Args:
496 max_workers: The maximum number of processes that can be used to
497 execute the given calls. If None or not given then as many
498 worker processes will be created as the machine has processors.
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200499 mp_context: A multiprocessing context to launch the workers. This
500 object should provide SimpleQueue, Queue and Process.
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100501 initializer: An callable used to initialize worker processes.
502 initargs: A tuple of arguments to pass to the initializer.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000503 """
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000504 _check_system_limits()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000505
506 if max_workers is None:
Charles-François Natali37cfb0a2013-06-28 19:25:45 +0200507 self._max_workers = os.cpu_count() or 1
Brian Quinlan81c4d362010-09-18 22:35:02 +0000508 else:
Brian Quinlan20efceb2014-05-17 13:51:10 -0700509 if max_workers <= 0:
510 raise ValueError("max_workers must be greater than 0")
511
Brian Quinlan81c4d362010-09-18 22:35:02 +0000512 self._max_workers = max_workers
Thomas Moreau94459fd2018-01-05 11:15:54 +0100513
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200514 if mp_context is None:
515 mp_context = mp.get_context()
516 self._mp_context = mp_context
Brian Quinlan81c4d362010-09-18 22:35:02 +0000517
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100518 if initializer is not None and not callable(initializer):
519 raise TypeError("initializer must be a callable")
520 self._initializer = initializer
521 self._initargs = initargs
522
Thomas Moreau94459fd2018-01-05 11:15:54 +0100523 # Management thread
Brian Quinlan81c4d362010-09-18 22:35:02 +0000524 self._queue_management_thread = None
Thomas Moreau94459fd2018-01-05 11:15:54 +0100525
Antoine Pitroudd696492011-06-08 17:21:55 +0200526 # Map of pids to processes
527 self._processes = {}
Brian Quinlan81c4d362010-09-18 22:35:02 +0000528
529 # Shutdown is a two-step process.
530 self._shutdown_thread = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000531 self._shutdown_lock = threading.Lock()
Antoine Pitroudd696492011-06-08 17:21:55 +0200532 self._broken = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000533 self._queue_count = 0
534 self._pending_work_items = {}
535
Thomas Moreau94459fd2018-01-05 11:15:54 +0100536 # Create communication channels for the executor
537 # Make the call queue slightly larger than the number of processes to
538 # prevent the worker processes from idling. But don't make it too big
539 # because futures in the call queue cannot be cancelled.
540 queue_size = self._max_workers + EXTRA_QUEUED_CALLS
541 self._call_queue = _SafeQueue(
542 max_size=queue_size, ctx=self._mp_context,
543 pending_work_items=self._pending_work_items)
544 # Killed worker processes can produce spurious "broken pipe"
545 # tracebacks in the queue's own worker thread. But we detect killed
546 # processes anyway, so silence the tracebacks.
547 self._call_queue._ignore_epipe = True
548 self._result_queue = mp_context.SimpleQueue()
549 self._work_ids = queue.Queue()
550
551 # _ThreadWakeup is a communication channel used to interrupt the wait
552 # of the main loop of queue_manager_thread from another thread (e.g.
553 # when calling executor.submit or executor.shutdown). We do not use the
554 # _result_queue to send the wakeup signal to the queue_manager_thread
555 # as it could result in a deadlock if a worker process dies with the
556 # _result_queue write lock still acquired.
557 self._queue_management_thread_wakeup = _ThreadWakeup()
558
Brian Quinlan81c4d362010-09-18 22:35:02 +0000559 def _start_queue_management_thread(self):
560 if self._queue_management_thread is None:
Thomas Moreau94459fd2018-01-05 11:15:54 +0100561 # When the executor gets garbarge collected, the weakref callback
562 # will wake up the queue management thread so that it can terminate
563 # if there is no pending work item.
564 def weakref_cb(_,
565 thread_wakeup=self._queue_management_thread_wakeup):
566 mp.util.debug('Executor collected: triggering callback for'
567 ' QueueManager wakeup')
568 thread_wakeup.wakeup()
Antoine Pitroudd696492011-06-08 17:21:55 +0200569 # Start the processes so that their sentinels are known.
570 self._adjust_process_count()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000571 self._queue_management_thread = threading.Thread(
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200572 target=_queue_management_worker,
573 args=(weakref.ref(self, weakref_cb),
574 self._processes,
575 self._pending_work_items,
576 self._work_ids,
577 self._call_queue,
Thomas Moreau94459fd2018-01-05 11:15:54 +0100578 self._result_queue,
579 self._queue_management_thread_wakeup),
580 name="QueueManagerThread")
Brian Quinlan81c4d362010-09-18 22:35:02 +0000581 self._queue_management_thread.daemon = True
582 self._queue_management_thread.start()
Thomas Moreau94459fd2018-01-05 11:15:54 +0100583 _threads_wakeups[self._queue_management_thread] = \
584 self._queue_management_thread_wakeup
Brian Quinlan81c4d362010-09-18 22:35:02 +0000585
586 def _adjust_process_count(self):
587 for _ in range(len(self._processes), self._max_workers):
Thomas Moreaue8c368d2017-10-03 11:53:17 +0200588 p = self._mp_context.Process(
589 target=_process_worker,
590 args=(self._call_queue,
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100591 self._result_queue,
592 self._initializer,
593 self._initargs))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000594 p.start()
Antoine Pitroudd696492011-06-08 17:21:55 +0200595 self._processes[p.pid] = p
Brian Quinlan81c4d362010-09-18 22:35:02 +0000596
Serhiy Storchaka42a139e2019-04-01 09:16:35 +0300597 def submit(*args, **kwargs):
598 if len(args) >= 2:
599 self, fn, *args = args
600 elif not args:
601 raise TypeError("descriptor 'submit' of 'ProcessPoolExecutor' object "
602 "needs an argument")
603 elif 'fn' in kwargs:
604 fn = kwargs.pop('fn')
605 self, *args = args
606 import warnings
607 warnings.warn("Passing 'fn' as keyword argument is deprecated",
608 DeprecationWarning, stacklevel=2)
609 else:
610 raise TypeError('submit expected at least 1 positional argument, '
611 'got %d' % (len(args)-1))
612
Brian Quinlan81c4d362010-09-18 22:35:02 +0000613 with self._shutdown_lock:
Antoine Pitroudd696492011-06-08 17:21:55 +0200614 if self._broken:
Antoine Pitrou63ff4132017-11-04 11:05:49 +0100615 raise BrokenProcessPool(self._broken)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000616 if self._shutdown_thread:
617 raise RuntimeError('cannot schedule new futures after shutdown')
Mark Nemecc4b695f2018-04-10 18:23:14 +0100618 if _global_shutdown:
619 raise RuntimeError('cannot schedule new futures after '
620 'interpreter shutdown')
Brian Quinlan81c4d362010-09-18 22:35:02 +0000621
622 f = _base.Future()
623 w = _WorkItem(f, fn, args, kwargs)
624
625 self._pending_work_items[self._queue_count] = w
626 self._work_ids.put(self._queue_count)
627 self._queue_count += 1
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100628 # Wake up queue management thread
Thomas Moreau94459fd2018-01-05 11:15:54 +0100629 self._queue_management_thread_wakeup.wakeup()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000630
631 self._start_queue_management_thread()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000632 return f
633 submit.__doc__ = _base.Executor.submit.__doc__
634
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200635 def map(self, fn, *iterables, timeout=None, chunksize=1):
Martin Panterd2ad5712015-11-02 04:20:33 +0000636 """Returns an iterator equivalent to map(fn, iter).
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200637
638 Args:
639 fn: A callable that will take as many arguments as there are
640 passed iterables.
641 timeout: The maximum number of seconds to wait. If None, then there
642 is no limit on the wait time.
643 chunksize: If greater than one, the iterables will be chopped into
644 chunks of size chunksize and submitted to the process pool.
645 If set to one, the items in the list will be sent one at a time.
646
647 Returns:
648 An iterator equivalent to: map(func, *iterables) but the calls may
649 be evaluated out-of-order.
650
651 Raises:
652 TimeoutError: If the entire result iterator could not be generated
653 before the given timeout.
654 Exception: If fn(*args) raises for any values.
655 """
656 if chunksize < 1:
657 raise ValueError("chunksize must be >= 1.")
658
659 results = super().map(partial(_process_chunk, fn),
660 _get_chunks(*iterables, chunksize=chunksize),
661 timeout=timeout)
Grzegorz Grzywacz97e1b1c2017-09-01 18:54:00 +0200662 return _chain_from_iterable_of_lists(results)
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200663
Brian Quinlan81c4d362010-09-18 22:35:02 +0000664 def shutdown(self, wait=True):
665 with self._shutdown_lock:
666 self._shutdown_thread = True
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100667 if self._queue_management_thread:
668 # Wake up queue management thread
Thomas Moreau94459fd2018-01-05 11:15:54 +0100669 self._queue_management_thread_wakeup.wakeup()
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100670 if wait:
Brian Quinlan81c4d362010-09-18 22:35:02 +0000671 self._queue_management_thread.join()
Ezio Melottib5bc3532013-08-17 16:11:40 +0300672 # To reduce the risk of opening too many files, remove references to
Brian Quinlan81c4d362010-09-18 22:35:02 +0000673 # objects that use file descriptors.
674 self._queue_management_thread = None
Victor Stinnerb713adf2017-09-02 00:25:11 +0200675 if self._call_queue is not None:
676 self._call_queue.close()
677 if wait:
678 self._call_queue.join_thread()
679 self._call_queue = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000680 self._result_queue = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000681 self._processes = None
Thomas Moreau095ee412018-03-12 18:18:41 +0100682
683 if self._queue_management_thread_wakeup:
684 self._queue_management_thread_wakeup.close()
685 self._queue_management_thread_wakeup = None
686
Brian Quinlan81c4d362010-09-18 22:35:02 +0000687 shutdown.__doc__ = _base.Executor.shutdown.__doc__
688
689atexit.register(_python_exit)