blob: fc64dbe84bfdc13726e458b75f4b7c2ac1021898 [file] [log] [blame]
Brian Quinlan81c4d362010-09-18 22:35:02 +00001# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4"""Implements ProcessPoolExecutor.
5
6The follow diagram and text describe the data-flow through the system:
7
8|======================= In-process =====================|== Out-of-process ==|
9
10+----------+ +----------+ +--------+ +-----------+ +---------+
11| | => | Work Ids | => | | => | Call Q | => | |
12| | +----------+ | | +-----------+ | |
13| | | ... | | | | ... | | |
14| | | 6 | | | | 5, call() | | |
15| | | 7 | | | | ... | | |
16| Process | | ... | | Local | +-----------+ | Process |
17| Pool | +----------+ | Worker | | #1..n |
18| Executor | | Thread | | |
19| | +----------- + | | +-----------+ | |
20| | <=> | Work Items | <=> | | <= | Result Q | <= | |
21| | +------------+ | | +-----------+ | |
22| | | 6: call() | | | | ... | | |
23| | | future | | | | 4, result | | |
24| | | ... | | | | 3, except | | |
25+----------+ +------------+ +--------+ +-----------+ +---------+
26
27Executor.submit() called:
28- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29- adds the id of the _WorkItem to the "Work Ids" queue
30
31Local worker thread:
32- reads work ids from the "Work Ids" queue and looks up the corresponding
33 WorkItem from the "Work Items" dict: if the work item has been cancelled then
34 it is simply removed from the dict, otherwise it is repackaged as a
35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38- reads _ResultItems from "Result Q", updates the future stored in the
39 "Work Items" dict and deletes the dict entry
40
41Process #1..n:
42- reads _CallItems from "Call Q", executes the calls, and puts the resulting
Mark Dickinson5ee24042012-10-20 13:16:49 +010043 _ResultItems in "Result Q"
Brian Quinlan81c4d362010-09-18 22:35:02 +000044"""
45
46__author__ = 'Brian Quinlan (brian@sweetapp.com)'
47
48import atexit
Antoine Pitroudd696492011-06-08 17:21:55 +020049import os
Brian Quinlan81c4d362010-09-18 22:35:02 +000050from concurrent.futures import _base
51import queue
Richard Oudkerk1f2eaa92013-10-16 17:06:22 +010052from queue import Full
Brian Quinlan81c4d362010-09-18 22:35:02 +000053import multiprocessing
Richard Oudkerk1f2eaa92013-10-16 17:06:22 +010054from multiprocessing import SimpleQueue
Antoine Pitroubdb1cf12012-03-05 19:28:37 +010055from multiprocessing.connection import wait
Brian Quinlan81c4d362010-09-18 22:35:02 +000056import threading
57import weakref
Antoine Pitrou4aae2762014-10-04 20:20:10 +020058from functools import partial
59import itertools
Brian Quinlan81c4d362010-09-18 22:35:02 +000060
61# Workers are created as daemon threads and processes. This is done to allow the
62# interpreter to exit when there are still idle processes in a
63# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
64# allowing workers to die with the interpreter has two undesirable properties:
65# - The workers would still be running during interpretor shutdown,
66# meaning that they would fail in unpredictable ways.
67# - The workers could be killed while evaluating a work item, which could
68# be bad if the callable being evaluated has external side-effects e.g.
69# writing to a file.
70#
71# To work around this problem, an exit handler is installed which tells the
72# workers to exit when their work queues are empty and then waits until the
73# threads/processes finish.
74
Antoine Pitrouc13d4542011-03-26 19:29:44 +010075_threads_queues = weakref.WeakKeyDictionary()
Brian Quinlan81c4d362010-09-18 22:35:02 +000076_shutdown = False
77
78def _python_exit():
79 global _shutdown
80 _shutdown = True
Antoine Pitrouc13d4542011-03-26 19:29:44 +010081 items = list(_threads_queues.items())
82 for t, q in items:
83 q.put(None)
84 for t, q in items:
85 t.join()
Brian Quinlan81c4d362010-09-18 22:35:02 +000086
87# Controls how many more calls than processes will be queued in the call queue.
88# A smaller number will mean that processes spend more time idle waiting for
89# work while a larger number will make Future.cancel() succeed less frequently
90# (Futures in the call queue cannot be cancelled).
91EXTRA_QUEUED_CALLS = 1
92
93class _WorkItem(object):
94 def __init__(self, future, fn, args, kwargs):
95 self.future = future
96 self.fn = fn
97 self.args = args
98 self.kwargs = kwargs
99
100class _ResultItem(object):
101 def __init__(self, work_id, exception=None, result=None):
102 self.work_id = work_id
103 self.exception = exception
104 self.result = result
105
106class _CallItem(object):
107 def __init__(self, work_id, fn, args, kwargs):
108 self.work_id = work_id
109 self.fn = fn
110 self.args = args
111 self.kwargs = kwargs
112
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200113def _get_chunks(*iterables, chunksize):
114 """ Iterates over zip()ed iterables in chunks. """
115 it = zip(*iterables)
116 while True:
117 chunk = tuple(itertools.islice(it, chunksize))
118 if not chunk:
119 return
120 yield chunk
121
122def _process_chunk(fn, chunk):
123 """ Processes a chunk of an iterable passed to map.
124
125 Runs the function passed to map() on a chunk of the
126 iterable passed to map.
127
128 This function is run in a separate process.
129
130 """
131 return [fn(*args) for args in chunk]
132
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200133def _process_worker(call_queue, result_queue):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000134 """Evaluates calls from call_queue and places the results in result_queue.
135
Georg Brandlfb1720b2010-12-09 18:08:43 +0000136 This worker is run in a separate process.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000137
138 Args:
139 call_queue: A multiprocessing.Queue of _CallItems that will be read and
140 evaluated by the worker.
141 result_queue: A multiprocessing.Queue of _ResultItems that will written
142 to by the worker.
143 shutdown: A multiprocessing.Event that will be set as a signal to the
144 worker that it should exit when call_queue is empty.
145 """
146 while True:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200147 call_item = call_queue.get(block=True)
148 if call_item is None:
149 # Wake up queue management thread
Antoine Pitroudd696492011-06-08 17:21:55 +0200150 result_queue.put(os.getpid())
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200151 return
Brian Quinlan81c4d362010-09-18 22:35:02 +0000152 try:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200153 r = call_item.fn(*call_item.args, **call_item.kwargs)
154 except BaseException as e:
155 result_queue.put(_ResultItem(call_item.work_id,
156 exception=e))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000157 else:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200158 result_queue.put(_ResultItem(call_item.work_id,
159 result=r))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000160
161def _add_call_item_to_queue(pending_work_items,
162 work_ids,
163 call_queue):
164 """Fills call_queue with _WorkItems from pending_work_items.
165
166 This function never blocks.
167
168 Args:
169 pending_work_items: A dict mapping work ids to _WorkItems e.g.
170 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
171 work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
172 are consumed and the corresponding _WorkItems from
173 pending_work_items are transformed into _CallItems and put in
174 call_queue.
175 call_queue: A multiprocessing.Queue that will be filled with _CallItems
176 derived from _WorkItems.
177 """
178 while True:
179 if call_queue.full():
180 return
181 try:
182 work_id = work_ids.get(block=False)
183 except queue.Empty:
184 return
185 else:
186 work_item = pending_work_items[work_id]
187
188 if work_item.future.set_running_or_notify_cancel():
189 call_queue.put(_CallItem(work_id,
190 work_item.fn,
191 work_item.args,
192 work_item.kwargs),
193 block=True)
194 else:
195 del pending_work_items[work_id]
196 continue
197
Antoine Pitroub87a56a2011-05-03 16:34:42 +0200198def _queue_management_worker(executor_reference,
199 processes,
200 pending_work_items,
201 work_ids_queue,
202 call_queue,
203 result_queue):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000204 """Manages the communication between this process and the worker processes.
205
206 This function is run in a local thread.
207
208 Args:
209 executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
210 this thread. Used to determine if the ProcessPoolExecutor has been
211 garbage collected and that this function can exit.
212 process: A list of the multiprocessing.Process instances used as
213 workers.
214 pending_work_items: A dict mapping work ids to _WorkItems e.g.
215 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
216 work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
217 call_queue: A multiprocessing.Queue that will be filled with _CallItems
218 derived from _WorkItems for processing by the process workers.
219 result_queue: A multiprocessing.Queue of _ResultItems generated by the
220 process workers.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000221 """
Antoine Pitrou020436b2011-07-02 21:20:25 +0200222 executor = None
223
224 def shutting_down():
225 return _shutdown or executor is None or executor._shutdown_thread
Antoine Pitroudd696492011-06-08 17:21:55 +0200226
227 def shutdown_worker():
228 # This is an upper bound
229 nb_children_alive = sum(p.is_alive() for p in processes.values())
230 for i in range(0, nb_children_alive):
Antoine Pitrou1c405b32011-07-03 13:17:06 +0200231 call_queue.put_nowait(None)
Antoine Pitroudc19c242011-07-16 01:51:58 +0200232 # Release the queue's resources as soon as possible.
233 call_queue.close()
Antoine Pitroudd696492011-06-08 17:21:55 +0200234 # If .join() is not called on the created processes then
Antoine Pitrou020436b2011-07-02 21:20:25 +0200235 # some multiprocessing.Queue methods may deadlock on Mac OS X.
Antoine Pitroudd696492011-06-08 17:21:55 +0200236 for p in processes.values():
237 p.join()
238
Antoine Pitroubdb1cf12012-03-05 19:28:37 +0100239 reader = result_queue._reader
240
Brian Quinlan81c4d362010-09-18 22:35:02 +0000241 while True:
242 _add_call_item_to_queue(pending_work_items,
243 work_ids_queue,
244 call_queue)
245
Antoine Pitroudd696492011-06-08 17:21:55 +0200246 sentinels = [p.sentinel for p in processes.values()]
247 assert sentinels
Antoine Pitroubdb1cf12012-03-05 19:28:37 +0100248 ready = wait([reader] + sentinels)
249 if reader in ready:
250 result_item = reader.recv()
251 else:
Antoine Pitroudd696492011-06-08 17:21:55 +0200252 # Mark the process pool broken so that submits fail right now.
253 executor = executor_reference()
254 if executor is not None:
255 executor._broken = True
256 executor._shutdown_thread = True
Antoine Pitrou020436b2011-07-02 21:20:25 +0200257 executor = None
Antoine Pitroudd696492011-06-08 17:21:55 +0200258 # All futures in flight must be marked failed
259 for work_id, work_item in pending_work_items.items():
260 work_item.future.set_exception(
261 BrokenProcessPool(
262 "A process in the process pool was "
263 "terminated abruptly while the future was "
264 "running or pending."
265 ))
Andrew Svetlov6b973742012-11-03 15:36:01 +0200266 # Delete references to object. See issue16284
267 del work_item
Antoine Pitroudd696492011-06-08 17:21:55 +0200268 pending_work_items.clear()
269 # Terminate remaining workers forcibly: the queues or their
270 # locks may be in a dirty state and block forever.
271 for p in processes.values():
272 p.terminate()
Antoine Pitroudc19c242011-07-16 01:51:58 +0200273 shutdown_worker()
Antoine Pitroudd696492011-06-08 17:21:55 +0200274 return
275 if isinstance(result_item, int):
276 # Clean shutdown of a worker using its PID
277 # (avoids marking the executor broken)
Antoine Pitrou020436b2011-07-02 21:20:25 +0200278 assert shutting_down()
Antoine Pitroud06a0652011-07-16 01:13:34 +0200279 p = processes.pop(result_item)
280 p.join()
Antoine Pitrou020436b2011-07-02 21:20:25 +0200281 if not processes:
282 shutdown_worker()
283 return
Antoine Pitroudd696492011-06-08 17:21:55 +0200284 elif result_item is not None:
285 work_item = pending_work_items.pop(result_item.work_id, None)
286 # work_item can be None if another process terminated (see above)
287 if work_item is not None:
288 if result_item.exception:
289 work_item.future.set_exception(result_item.exception)
290 else:
291 work_item.future.set_result(result_item.result)
Andrew Svetlov6b973742012-11-03 15:36:01 +0200292 # Delete references to object. See issue16284
293 del work_item
Antoine Pitroudd696492011-06-08 17:21:55 +0200294 # Check whether we should start shutting down.
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100295 executor = executor_reference()
296 # No more work items can be added if:
297 # - The interpreter is shutting down OR
298 # - The executor that owns this worker has been collected OR
299 # - The executor that owns this worker has been shutdown.
Antoine Pitrou020436b2011-07-02 21:20:25 +0200300 if shutting_down():
Antoine Pitrou020436b2011-07-02 21:20:25 +0200301 try:
Antoine Pitrou1c405b32011-07-03 13:17:06 +0200302 # Since no new work items can be added, it is safe to shutdown
303 # this thread if there are no pending work items.
304 if not pending_work_items:
305 shutdown_worker()
306 return
Antoine Pitrou020436b2011-07-02 21:20:25 +0200307 except Full:
308 # This is not a problem: we will eventually be woken up (in
Antoine Pitrou1c405b32011-07-03 13:17:06 +0200309 # result_queue.get()) and be able to send a sentinel again.
Antoine Pitrou020436b2011-07-02 21:20:25 +0200310 pass
311 executor = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000312
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000313_system_limits_checked = False
314_system_limited = None
315def _check_system_limits():
316 global _system_limits_checked, _system_limited
317 if _system_limits_checked:
318 if _system_limited:
319 raise NotImplementedError(_system_limited)
320 _system_limits_checked = True
321 try:
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000322 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
323 except (AttributeError, ValueError):
324 # sysconf not available or setting not available
325 return
326 if nsems_max == -1:
Ezio Melottib5bc3532013-08-17 16:11:40 +0300327 # indetermined limit, assume that limit is determined
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000328 # by available memory only
329 return
330 if nsems_max >= 256:
331 # minimum number of semaphores available
332 # according to POSIX
333 return
334 _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
335 raise NotImplementedError(_system_limited)
336
Antoine Pitroudd696492011-06-08 17:21:55 +0200337
338class BrokenProcessPool(RuntimeError):
339 """
340 Raised when a process in a ProcessPoolExecutor terminated abruptly
341 while a future was in the running state.
342 """
343
344
Brian Quinlan81c4d362010-09-18 22:35:02 +0000345class ProcessPoolExecutor(_base.Executor):
346 def __init__(self, max_workers=None):
347 """Initializes a new ProcessPoolExecutor instance.
348
349 Args:
350 max_workers: The maximum number of processes that can be used to
351 execute the given calls. If None or not given then as many
352 worker processes will be created as the machine has processors.
353 """
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000354 _check_system_limits()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000355
356 if max_workers is None:
Charles-François Natali37cfb0a2013-06-28 19:25:45 +0200357 self._max_workers = os.cpu_count() or 1
Brian Quinlan81c4d362010-09-18 22:35:02 +0000358 else:
Brian Quinlan20efceb2014-05-17 13:51:10 -0700359 if max_workers <= 0:
360 raise ValueError("max_workers must be greater than 0")
361
Brian Quinlan81c4d362010-09-18 22:35:02 +0000362 self._max_workers = max_workers
363
364 # Make the call queue slightly larger than the number of processes to
365 # prevent the worker processes from idling. But don't make it too big
366 # because futures in the call queue cannot be cancelled.
367 self._call_queue = multiprocessing.Queue(self._max_workers +
368 EXTRA_QUEUED_CALLS)
Antoine Pitroudc19c242011-07-16 01:51:58 +0200369 # Killed worker processes can produce spurious "broken pipe"
370 # tracebacks in the queue's own worker thread. But we detect killed
371 # processes anyway, so silence the tracebacks.
372 self._call_queue._ignore_epipe = True
Antoine Pitroub7877f22011-04-12 17:58:11 +0200373 self._result_queue = SimpleQueue()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000374 self._work_ids = queue.Queue()
375 self._queue_management_thread = None
Antoine Pitroudd696492011-06-08 17:21:55 +0200376 # Map of pids to processes
377 self._processes = {}
Brian Quinlan81c4d362010-09-18 22:35:02 +0000378
379 # Shutdown is a two-step process.
380 self._shutdown_thread = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000381 self._shutdown_lock = threading.Lock()
Antoine Pitroudd696492011-06-08 17:21:55 +0200382 self._broken = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000383 self._queue_count = 0
384 self._pending_work_items = {}
385
386 def _start_queue_management_thread(self):
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100387 # When the executor gets lost, the weakref callback will wake up
388 # the queue management thread.
389 def weakref_cb(_, q=self._result_queue):
390 q.put(None)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000391 if self._queue_management_thread is None:
Antoine Pitroudd696492011-06-08 17:21:55 +0200392 # Start the processes so that their sentinels are known.
393 self._adjust_process_count()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000394 self._queue_management_thread = threading.Thread(
Antoine Pitroub87a56a2011-05-03 16:34:42 +0200395 target=_queue_management_worker,
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100396 args=(weakref.ref(self, weakref_cb),
Brian Quinlan81c4d362010-09-18 22:35:02 +0000397 self._processes,
398 self._pending_work_items,
399 self._work_ids,
400 self._call_queue,
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200401 self._result_queue))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000402 self._queue_management_thread.daemon = True
403 self._queue_management_thread.start()
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100404 _threads_queues[self._queue_management_thread] = self._result_queue
Brian Quinlan81c4d362010-09-18 22:35:02 +0000405
406 def _adjust_process_count(self):
407 for _ in range(len(self._processes), self._max_workers):
408 p = multiprocessing.Process(
409 target=_process_worker,
410 args=(self._call_queue,
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200411 self._result_queue))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000412 p.start()
Antoine Pitroudd696492011-06-08 17:21:55 +0200413 self._processes[p.pid] = p
Brian Quinlan81c4d362010-09-18 22:35:02 +0000414
415 def submit(self, fn, *args, **kwargs):
416 with self._shutdown_lock:
Antoine Pitroudd696492011-06-08 17:21:55 +0200417 if self._broken:
418 raise BrokenProcessPool('A child process terminated '
419 'abruptly, the process pool is not usable anymore')
Brian Quinlan81c4d362010-09-18 22:35:02 +0000420 if self._shutdown_thread:
421 raise RuntimeError('cannot schedule new futures after shutdown')
422
423 f = _base.Future()
424 w = _WorkItem(f, fn, args, kwargs)
425
426 self._pending_work_items[self._queue_count] = w
427 self._work_ids.put(self._queue_count)
428 self._queue_count += 1
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100429 # Wake up queue management thread
430 self._result_queue.put(None)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000431
432 self._start_queue_management_thread()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000433 return f
434 submit.__doc__ = _base.Executor.submit.__doc__
435
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200436 def map(self, fn, *iterables, timeout=None, chunksize=1):
437 """Returns a iterator equivalent to map(fn, iter).
438
439 Args:
440 fn: A callable that will take as many arguments as there are
441 passed iterables.
442 timeout: The maximum number of seconds to wait. If None, then there
443 is no limit on the wait time.
444 chunksize: If greater than one, the iterables will be chopped into
445 chunks of size chunksize and submitted to the process pool.
446 If set to one, the items in the list will be sent one at a time.
447
448 Returns:
449 An iterator equivalent to: map(func, *iterables) but the calls may
450 be evaluated out-of-order.
451
452 Raises:
453 TimeoutError: If the entire result iterator could not be generated
454 before the given timeout.
455 Exception: If fn(*args) raises for any values.
456 """
457 if chunksize < 1:
458 raise ValueError("chunksize must be >= 1.")
459
460 results = super().map(partial(_process_chunk, fn),
461 _get_chunks(*iterables, chunksize=chunksize),
462 timeout=timeout)
463 return itertools.chain.from_iterable(results)
464
Brian Quinlan81c4d362010-09-18 22:35:02 +0000465 def shutdown(self, wait=True):
466 with self._shutdown_lock:
467 self._shutdown_thread = True
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100468 if self._queue_management_thread:
469 # Wake up queue management thread
470 self._result_queue.put(None)
471 if wait:
Brian Quinlan81c4d362010-09-18 22:35:02 +0000472 self._queue_management_thread.join()
Ezio Melottib5bc3532013-08-17 16:11:40 +0300473 # To reduce the risk of opening too many files, remove references to
Brian Quinlan81c4d362010-09-18 22:35:02 +0000474 # objects that use file descriptors.
475 self._queue_management_thread = None
476 self._call_queue = None
477 self._result_queue = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000478 self._processes = None
479 shutdown.__doc__ = _base.Executor.shutdown.__doc__
480
481atexit.register(_python_exit)