blob: 07b5225d1dab8082c1c0dff922a4434696dca5d6 [file] [log] [blame]
Brian Quinlan81c4d362010-09-18 22:35:02 +00001# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4"""Implements ProcessPoolExecutor.
5
6The follow diagram and text describe the data-flow through the system:
7
8|======================= In-process =====================|== Out-of-process ==|
9
10+----------+ +----------+ +--------+ +-----------+ +---------+
11| | => | Work Ids | => | | => | Call Q | => | |
12| | +----------+ | | +-----------+ | |
13| | | ... | | | | ... | | |
14| | | 6 | | | | 5, call() | | |
15| | | 7 | | | | ... | | |
16| Process | | ... | | Local | +-----------+ | Process |
17| Pool | +----------+ | Worker | | #1..n |
18| Executor | | Thread | | |
19| | +----------- + | | +-----------+ | |
20| | <=> | Work Items | <=> | | <= | Result Q | <= | |
21| | +------------+ | | +-----------+ | |
22| | | 6: call() | | | | ... | | |
23| | | future | | | | 4, result | | |
24| | | ... | | | | 3, except | | |
25+----------+ +------------+ +--------+ +-----------+ +---------+
26
27Executor.submit() called:
28- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29- adds the id of the _WorkItem to the "Work Ids" queue
30
31Local worker thread:
32- reads work ids from the "Work Ids" queue and looks up the corresponding
33 WorkItem from the "Work Items" dict: if the work item has been cancelled then
34 it is simply removed from the dict, otherwise it is repackaged as a
35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38- reads _ResultItems from "Result Q", updates the future stored in the
39 "Work Items" dict and deletes the dict entry
40
41Process #1..n:
42- reads _CallItems from "Call Q", executes the calls, and puts the resulting
Mark Dickinson5ee24042012-10-20 13:16:49 +010043 _ResultItems in "Result Q"
Brian Quinlan81c4d362010-09-18 22:35:02 +000044"""
45
46__author__ = 'Brian Quinlan (brian@sweetapp.com)'
47
48import atexit
Antoine Pitroudd696492011-06-08 17:21:55 +020049import os
Brian Quinlan81c4d362010-09-18 22:35:02 +000050from concurrent.futures import _base
51import queue
Richard Oudkerk1f2eaa92013-10-16 17:06:22 +010052from queue import Full
Brian Quinlan81c4d362010-09-18 22:35:02 +000053import multiprocessing
Richard Oudkerk1f2eaa92013-10-16 17:06:22 +010054from multiprocessing import SimpleQueue
Antoine Pitroubdb1cf12012-03-05 19:28:37 +010055from multiprocessing.connection import wait
Brian Quinlan81c4d362010-09-18 22:35:02 +000056import threading
57import weakref
58
59# Workers are created as daemon threads and processes. This is done to allow the
60# interpreter to exit when there are still idle processes in a
61# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
62# allowing workers to die with the interpreter has two undesirable properties:
63# - The workers would still be running during interpretor shutdown,
64# meaning that they would fail in unpredictable ways.
65# - The workers could be killed while evaluating a work item, which could
66# be bad if the callable being evaluated has external side-effects e.g.
67# writing to a file.
68#
69# To work around this problem, an exit handler is installed which tells the
70# workers to exit when their work queues are empty and then waits until the
71# threads/processes finish.
72
Antoine Pitrouc13d4542011-03-26 19:29:44 +010073_threads_queues = weakref.WeakKeyDictionary()
Brian Quinlan81c4d362010-09-18 22:35:02 +000074_shutdown = False
75
76def _python_exit():
77 global _shutdown
78 _shutdown = True
Antoine Pitrouc13d4542011-03-26 19:29:44 +010079 items = list(_threads_queues.items())
80 for t, q in items:
81 q.put(None)
82 for t, q in items:
83 t.join()
Brian Quinlan81c4d362010-09-18 22:35:02 +000084
85# Controls how many more calls than processes will be queued in the call queue.
86# A smaller number will mean that processes spend more time idle waiting for
87# work while a larger number will make Future.cancel() succeed less frequently
88# (Futures in the call queue cannot be cancelled).
89EXTRA_QUEUED_CALLS = 1
90
91class _WorkItem(object):
92 def __init__(self, future, fn, args, kwargs):
93 self.future = future
94 self.fn = fn
95 self.args = args
96 self.kwargs = kwargs
97
98class _ResultItem(object):
99 def __init__(self, work_id, exception=None, result=None):
100 self.work_id = work_id
101 self.exception = exception
102 self.result = result
103
104class _CallItem(object):
105 def __init__(self, work_id, fn, args, kwargs):
106 self.work_id = work_id
107 self.fn = fn
108 self.args = args
109 self.kwargs = kwargs
110
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200111def _process_worker(call_queue, result_queue):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000112 """Evaluates calls from call_queue and places the results in result_queue.
113
Georg Brandlfb1720b2010-12-09 18:08:43 +0000114 This worker is run in a separate process.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000115
116 Args:
117 call_queue: A multiprocessing.Queue of _CallItems that will be read and
118 evaluated by the worker.
119 result_queue: A multiprocessing.Queue of _ResultItems that will written
120 to by the worker.
121 shutdown: A multiprocessing.Event that will be set as a signal to the
122 worker that it should exit when call_queue is empty.
123 """
124 while True:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200125 call_item = call_queue.get(block=True)
126 if call_item is None:
127 # Wake up queue management thread
Antoine Pitroudd696492011-06-08 17:21:55 +0200128 result_queue.put(os.getpid())
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200129 return
Brian Quinlan81c4d362010-09-18 22:35:02 +0000130 try:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200131 r = call_item.fn(*call_item.args, **call_item.kwargs)
132 except BaseException as e:
133 result_queue.put(_ResultItem(call_item.work_id,
134 exception=e))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000135 else:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200136 result_queue.put(_ResultItem(call_item.work_id,
137 result=r))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000138
139def _add_call_item_to_queue(pending_work_items,
140 work_ids,
141 call_queue):
142 """Fills call_queue with _WorkItems from pending_work_items.
143
144 This function never blocks.
145
146 Args:
147 pending_work_items: A dict mapping work ids to _WorkItems e.g.
148 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
149 work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
150 are consumed and the corresponding _WorkItems from
151 pending_work_items are transformed into _CallItems and put in
152 call_queue.
153 call_queue: A multiprocessing.Queue that will be filled with _CallItems
154 derived from _WorkItems.
155 """
156 while True:
157 if call_queue.full():
158 return
159 try:
160 work_id = work_ids.get(block=False)
161 except queue.Empty:
162 return
163 else:
164 work_item = pending_work_items[work_id]
165
166 if work_item.future.set_running_or_notify_cancel():
167 call_queue.put(_CallItem(work_id,
168 work_item.fn,
169 work_item.args,
170 work_item.kwargs),
171 block=True)
172 else:
173 del pending_work_items[work_id]
174 continue
175
Antoine Pitroub87a56a2011-05-03 16:34:42 +0200176def _queue_management_worker(executor_reference,
177 processes,
178 pending_work_items,
179 work_ids_queue,
180 call_queue,
181 result_queue):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000182 """Manages the communication between this process and the worker processes.
183
184 This function is run in a local thread.
185
186 Args:
187 executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
188 this thread. Used to determine if the ProcessPoolExecutor has been
189 garbage collected and that this function can exit.
190 process: A list of the multiprocessing.Process instances used as
191 workers.
192 pending_work_items: A dict mapping work ids to _WorkItems e.g.
193 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
194 work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
195 call_queue: A multiprocessing.Queue that will be filled with _CallItems
196 derived from _WorkItems for processing by the process workers.
197 result_queue: A multiprocessing.Queue of _ResultItems generated by the
198 process workers.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000199 """
Antoine Pitrou020436b2011-07-02 21:20:25 +0200200 executor = None
201
202 def shutting_down():
203 return _shutdown or executor is None or executor._shutdown_thread
Antoine Pitroudd696492011-06-08 17:21:55 +0200204
205 def shutdown_worker():
206 # This is an upper bound
207 nb_children_alive = sum(p.is_alive() for p in processes.values())
208 for i in range(0, nb_children_alive):
Antoine Pitrou1c405b32011-07-03 13:17:06 +0200209 call_queue.put_nowait(None)
Antoine Pitroudc19c242011-07-16 01:51:58 +0200210 # Release the queue's resources as soon as possible.
211 call_queue.close()
Antoine Pitroudd696492011-06-08 17:21:55 +0200212 # If .join() is not called on the created processes then
Antoine Pitrou020436b2011-07-02 21:20:25 +0200213 # some multiprocessing.Queue methods may deadlock on Mac OS X.
Antoine Pitroudd696492011-06-08 17:21:55 +0200214 for p in processes.values():
215 p.join()
216
Antoine Pitroubdb1cf12012-03-05 19:28:37 +0100217 reader = result_queue._reader
218
Brian Quinlan81c4d362010-09-18 22:35:02 +0000219 while True:
220 _add_call_item_to_queue(pending_work_items,
221 work_ids_queue,
222 call_queue)
223
Antoine Pitroudd696492011-06-08 17:21:55 +0200224 sentinels = [p.sentinel for p in processes.values()]
225 assert sentinels
Antoine Pitroubdb1cf12012-03-05 19:28:37 +0100226 ready = wait([reader] + sentinels)
227 if reader in ready:
228 result_item = reader.recv()
229 else:
Antoine Pitroudd696492011-06-08 17:21:55 +0200230 # Mark the process pool broken so that submits fail right now.
231 executor = executor_reference()
232 if executor is not None:
233 executor._broken = True
234 executor._shutdown_thread = True
Antoine Pitrou020436b2011-07-02 21:20:25 +0200235 executor = None
Antoine Pitroudd696492011-06-08 17:21:55 +0200236 # All futures in flight must be marked failed
237 for work_id, work_item in pending_work_items.items():
238 work_item.future.set_exception(
239 BrokenProcessPool(
240 "A process in the process pool was "
241 "terminated abruptly while the future was "
242 "running or pending."
243 ))
Andrew Svetlov6b973742012-11-03 15:36:01 +0200244 # Delete references to object. See issue16284
245 del work_item
Antoine Pitroudd696492011-06-08 17:21:55 +0200246 pending_work_items.clear()
247 # Terminate remaining workers forcibly: the queues or their
248 # locks may be in a dirty state and block forever.
249 for p in processes.values():
250 p.terminate()
Antoine Pitroudc19c242011-07-16 01:51:58 +0200251 shutdown_worker()
Antoine Pitroudd696492011-06-08 17:21:55 +0200252 return
253 if isinstance(result_item, int):
254 # Clean shutdown of a worker using its PID
255 # (avoids marking the executor broken)
Antoine Pitrou020436b2011-07-02 21:20:25 +0200256 assert shutting_down()
Antoine Pitroud06a0652011-07-16 01:13:34 +0200257 p = processes.pop(result_item)
258 p.join()
Antoine Pitrou020436b2011-07-02 21:20:25 +0200259 if not processes:
260 shutdown_worker()
261 return
Antoine Pitroudd696492011-06-08 17:21:55 +0200262 elif result_item is not None:
263 work_item = pending_work_items.pop(result_item.work_id, None)
264 # work_item can be None if another process terminated (see above)
265 if work_item is not None:
266 if result_item.exception:
267 work_item.future.set_exception(result_item.exception)
268 else:
269 work_item.future.set_result(result_item.result)
Andrew Svetlov6b973742012-11-03 15:36:01 +0200270 # Delete references to object. See issue16284
271 del work_item
Antoine Pitroudd696492011-06-08 17:21:55 +0200272 # Check whether we should start shutting down.
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100273 executor = executor_reference()
274 # No more work items can be added if:
275 # - The interpreter is shutting down OR
276 # - The executor that owns this worker has been collected OR
277 # - The executor that owns this worker has been shutdown.
Antoine Pitrou020436b2011-07-02 21:20:25 +0200278 if shutting_down():
Antoine Pitrou020436b2011-07-02 21:20:25 +0200279 try:
Antoine Pitrou1c405b32011-07-03 13:17:06 +0200280 # Since no new work items can be added, it is safe to shutdown
281 # this thread if there are no pending work items.
282 if not pending_work_items:
283 shutdown_worker()
284 return
Antoine Pitrou020436b2011-07-02 21:20:25 +0200285 except Full:
286 # This is not a problem: we will eventually be woken up (in
Antoine Pitrou1c405b32011-07-03 13:17:06 +0200287 # result_queue.get()) and be able to send a sentinel again.
Antoine Pitrou020436b2011-07-02 21:20:25 +0200288 pass
289 executor = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000290
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000291_system_limits_checked = False
292_system_limited = None
293def _check_system_limits():
294 global _system_limits_checked, _system_limited
295 if _system_limits_checked:
296 if _system_limited:
297 raise NotImplementedError(_system_limited)
298 _system_limits_checked = True
299 try:
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000300 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
301 except (AttributeError, ValueError):
302 # sysconf not available or setting not available
303 return
304 if nsems_max == -1:
Ezio Melottib5bc3532013-08-17 16:11:40 +0300305 # indetermined limit, assume that limit is determined
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000306 # by available memory only
307 return
308 if nsems_max >= 256:
309 # minimum number of semaphores available
310 # according to POSIX
311 return
312 _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
313 raise NotImplementedError(_system_limited)
314
Antoine Pitroudd696492011-06-08 17:21:55 +0200315
316class BrokenProcessPool(RuntimeError):
317 """
318 Raised when a process in a ProcessPoolExecutor terminated abruptly
319 while a future was in the running state.
320 """
321
322
Brian Quinlan81c4d362010-09-18 22:35:02 +0000323class ProcessPoolExecutor(_base.Executor):
324 def __init__(self, max_workers=None):
325 """Initializes a new ProcessPoolExecutor instance.
326
327 Args:
328 max_workers: The maximum number of processes that can be used to
329 execute the given calls. If None or not given then as many
330 worker processes will be created as the machine has processors.
331 """
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000332 _check_system_limits()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000333
334 if max_workers is None:
Charles-François Natali37cfb0a2013-06-28 19:25:45 +0200335 self._max_workers = os.cpu_count() or 1
Brian Quinlan81c4d362010-09-18 22:35:02 +0000336 else:
337 self._max_workers = max_workers
338
339 # Make the call queue slightly larger than the number of processes to
340 # prevent the worker processes from idling. But don't make it too big
341 # because futures in the call queue cannot be cancelled.
342 self._call_queue = multiprocessing.Queue(self._max_workers +
343 EXTRA_QUEUED_CALLS)
Antoine Pitroudc19c242011-07-16 01:51:58 +0200344 # Killed worker processes can produce spurious "broken pipe"
345 # tracebacks in the queue's own worker thread. But we detect killed
346 # processes anyway, so silence the tracebacks.
347 self._call_queue._ignore_epipe = True
Antoine Pitroub7877f22011-04-12 17:58:11 +0200348 self._result_queue = SimpleQueue()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000349 self._work_ids = queue.Queue()
350 self._queue_management_thread = None
Antoine Pitroudd696492011-06-08 17:21:55 +0200351 # Map of pids to processes
352 self._processes = {}
Brian Quinlan81c4d362010-09-18 22:35:02 +0000353
354 # Shutdown is a two-step process.
355 self._shutdown_thread = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000356 self._shutdown_lock = threading.Lock()
Antoine Pitroudd696492011-06-08 17:21:55 +0200357 self._broken = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000358 self._queue_count = 0
359 self._pending_work_items = {}
360
361 def _start_queue_management_thread(self):
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100362 # When the executor gets lost, the weakref callback will wake up
363 # the queue management thread.
364 def weakref_cb(_, q=self._result_queue):
365 q.put(None)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000366 if self._queue_management_thread is None:
Antoine Pitroudd696492011-06-08 17:21:55 +0200367 # Start the processes so that their sentinels are known.
368 self._adjust_process_count()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000369 self._queue_management_thread = threading.Thread(
Antoine Pitroub87a56a2011-05-03 16:34:42 +0200370 target=_queue_management_worker,
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100371 args=(weakref.ref(self, weakref_cb),
Brian Quinlan81c4d362010-09-18 22:35:02 +0000372 self._processes,
373 self._pending_work_items,
374 self._work_ids,
375 self._call_queue,
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200376 self._result_queue))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000377 self._queue_management_thread.daemon = True
378 self._queue_management_thread.start()
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100379 _threads_queues[self._queue_management_thread] = self._result_queue
Brian Quinlan81c4d362010-09-18 22:35:02 +0000380
381 def _adjust_process_count(self):
382 for _ in range(len(self._processes), self._max_workers):
383 p = multiprocessing.Process(
384 target=_process_worker,
385 args=(self._call_queue,
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200386 self._result_queue))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000387 p.start()
Antoine Pitroudd696492011-06-08 17:21:55 +0200388 self._processes[p.pid] = p
Brian Quinlan81c4d362010-09-18 22:35:02 +0000389
390 def submit(self, fn, *args, **kwargs):
391 with self._shutdown_lock:
Antoine Pitroudd696492011-06-08 17:21:55 +0200392 if self._broken:
393 raise BrokenProcessPool('A child process terminated '
394 'abruptly, the process pool is not usable anymore')
Brian Quinlan81c4d362010-09-18 22:35:02 +0000395 if self._shutdown_thread:
396 raise RuntimeError('cannot schedule new futures after shutdown')
397
398 f = _base.Future()
399 w = _WorkItem(f, fn, args, kwargs)
400
401 self._pending_work_items[self._queue_count] = w
402 self._work_ids.put(self._queue_count)
403 self._queue_count += 1
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100404 # Wake up queue management thread
405 self._result_queue.put(None)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000406
407 self._start_queue_management_thread()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000408 return f
409 submit.__doc__ = _base.Executor.submit.__doc__
410
411 def shutdown(self, wait=True):
412 with self._shutdown_lock:
413 self._shutdown_thread = True
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100414 if self._queue_management_thread:
415 # Wake up queue management thread
416 self._result_queue.put(None)
417 if wait:
Brian Quinlan81c4d362010-09-18 22:35:02 +0000418 self._queue_management_thread.join()
Ezio Melottib5bc3532013-08-17 16:11:40 +0300419 # To reduce the risk of opening too many files, remove references to
Brian Quinlan81c4d362010-09-18 22:35:02 +0000420 # objects that use file descriptors.
421 self._queue_management_thread = None
422 self._call_queue = None
423 self._result_queue = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000424 self._processes = None
425 shutdown.__doc__ = _base.Executor.shutdown.__doc__
426
427atexit.register(_python_exit)