blob: 3c20b934c25432e0f5bc8669faa9aab53576597b [file] [log] [blame]
Brian Quinlan81c4d362010-09-18 22:35:02 +00001# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4"""Implements ProcessPoolExecutor.
5
6The follow diagram and text describe the data-flow through the system:
7
8|======================= In-process =====================|== Out-of-process ==|
9
10+----------+ +----------+ +--------+ +-----------+ +---------+
11| | => | Work Ids | => | | => | Call Q | => | |
12| | +----------+ | | +-----------+ | |
13| | | ... | | | | ... | | |
14| | | 6 | | | | 5, call() | | |
15| | | 7 | | | | ... | | |
16| Process | | ... | | Local | +-----------+ | Process |
17| Pool | +----------+ | Worker | | #1..n |
18| Executor | | Thread | | |
19| | +----------- + | | +-----------+ | |
20| | <=> | Work Items | <=> | | <= | Result Q | <= | |
21| | +------------+ | | +-----------+ | |
22| | | 6: call() | | | | ... | | |
23| | | future | | | | 4, result | | |
24| | | ... | | | | 3, except | | |
25+----------+ +------------+ +--------+ +-----------+ +---------+
26
27Executor.submit() called:
28- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29- adds the id of the _WorkItem to the "Work Ids" queue
30
31Local worker thread:
32- reads work ids from the "Work Ids" queue and looks up the corresponding
33 WorkItem from the "Work Items" dict: if the work item has been cancelled then
34 it is simply removed from the dict, otherwise it is repackaged as a
35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38- reads _ResultItems from "Result Q", updates the future stored in the
39 "Work Items" dict and deletes the dict entry
40
41Process #1..n:
42- reads _CallItems from "Call Q", executes the calls, and puts the resulting
Mark Dickinson5ee24042012-10-20 13:16:49 +010043 _ResultItems in "Result Q"
Brian Quinlan81c4d362010-09-18 22:35:02 +000044"""
45
46__author__ = 'Brian Quinlan (brian@sweetapp.com)'
47
48import atexit
Antoine Pitroudd696492011-06-08 17:21:55 +020049import os
Brian Quinlan81c4d362010-09-18 22:35:02 +000050from concurrent.futures import _base
51import queue
52import multiprocessing
Antoine Pitroubdb1cf12012-03-05 19:28:37 +010053from multiprocessing.queues import SimpleQueue, Full
54from multiprocessing.connection import wait
Brian Quinlan81c4d362010-09-18 22:35:02 +000055import threading
56import weakref
57
58# Workers are created as daemon threads and processes. This is done to allow the
59# interpreter to exit when there are still idle processes in a
60# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
61# allowing workers to die with the interpreter has two undesirable properties:
62# - The workers would still be running during interpretor shutdown,
63# meaning that they would fail in unpredictable ways.
64# - The workers could be killed while evaluating a work item, which could
65# be bad if the callable being evaluated has external side-effects e.g.
66# writing to a file.
67#
68# To work around this problem, an exit handler is installed which tells the
69# workers to exit when their work queues are empty and then waits until the
70# threads/processes finish.
71
Antoine Pitrouc13d4542011-03-26 19:29:44 +010072_threads_queues = weakref.WeakKeyDictionary()
Brian Quinlan81c4d362010-09-18 22:35:02 +000073_shutdown = False
74
75def _python_exit():
76 global _shutdown
77 _shutdown = True
Antoine Pitrouc13d4542011-03-26 19:29:44 +010078 items = list(_threads_queues.items())
79 for t, q in items:
80 q.put(None)
81 for t, q in items:
82 t.join()
Brian Quinlan81c4d362010-09-18 22:35:02 +000083
84# Controls how many more calls than processes will be queued in the call queue.
85# A smaller number will mean that processes spend more time idle waiting for
86# work while a larger number will make Future.cancel() succeed less frequently
87# (Futures in the call queue cannot be cancelled).
88EXTRA_QUEUED_CALLS = 1
89
90class _WorkItem(object):
91 def __init__(self, future, fn, args, kwargs):
92 self.future = future
93 self.fn = fn
94 self.args = args
95 self.kwargs = kwargs
96
97class _ResultItem(object):
98 def __init__(self, work_id, exception=None, result=None):
99 self.work_id = work_id
100 self.exception = exception
101 self.result = result
102
103class _CallItem(object):
104 def __init__(self, work_id, fn, args, kwargs):
105 self.work_id = work_id
106 self.fn = fn
107 self.args = args
108 self.kwargs = kwargs
109
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200110def _process_worker(call_queue, result_queue):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000111 """Evaluates calls from call_queue and places the results in result_queue.
112
Georg Brandlfb1720b2010-12-09 18:08:43 +0000113 This worker is run in a separate process.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000114
115 Args:
116 call_queue: A multiprocessing.Queue of _CallItems that will be read and
117 evaluated by the worker.
118 result_queue: A multiprocessing.Queue of _ResultItems that will written
119 to by the worker.
120 shutdown: A multiprocessing.Event that will be set as a signal to the
121 worker that it should exit when call_queue is empty.
122 """
123 while True:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200124 call_item = call_queue.get(block=True)
125 if call_item is None:
126 # Wake up queue management thread
Antoine Pitroudd696492011-06-08 17:21:55 +0200127 result_queue.put(os.getpid())
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200128 return
Brian Quinlan81c4d362010-09-18 22:35:02 +0000129 try:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200130 r = call_item.fn(*call_item.args, **call_item.kwargs)
131 except BaseException as e:
132 result_queue.put(_ResultItem(call_item.work_id,
133 exception=e))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000134 else:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200135 result_queue.put(_ResultItem(call_item.work_id,
136 result=r))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000137
138def _add_call_item_to_queue(pending_work_items,
139 work_ids,
140 call_queue):
141 """Fills call_queue with _WorkItems from pending_work_items.
142
143 This function never blocks.
144
145 Args:
146 pending_work_items: A dict mapping work ids to _WorkItems e.g.
147 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
148 work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
149 are consumed and the corresponding _WorkItems from
150 pending_work_items are transformed into _CallItems and put in
151 call_queue.
152 call_queue: A multiprocessing.Queue that will be filled with _CallItems
153 derived from _WorkItems.
154 """
155 while True:
156 if call_queue.full():
157 return
158 try:
159 work_id = work_ids.get(block=False)
160 except queue.Empty:
161 return
162 else:
163 work_item = pending_work_items[work_id]
164
165 if work_item.future.set_running_or_notify_cancel():
166 call_queue.put(_CallItem(work_id,
167 work_item.fn,
168 work_item.args,
169 work_item.kwargs),
170 block=True)
171 else:
172 del pending_work_items[work_id]
173 continue
174
Antoine Pitroub87a56a2011-05-03 16:34:42 +0200175def _queue_management_worker(executor_reference,
176 processes,
177 pending_work_items,
178 work_ids_queue,
179 call_queue,
180 result_queue):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000181 """Manages the communication between this process and the worker processes.
182
183 This function is run in a local thread.
184
185 Args:
186 executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
187 this thread. Used to determine if the ProcessPoolExecutor has been
188 garbage collected and that this function can exit.
189 process: A list of the multiprocessing.Process instances used as
190 workers.
191 pending_work_items: A dict mapping work ids to _WorkItems e.g.
192 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
193 work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
194 call_queue: A multiprocessing.Queue that will be filled with _CallItems
195 derived from _WorkItems for processing by the process workers.
196 result_queue: A multiprocessing.Queue of _ResultItems generated by the
197 process workers.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000198 """
Antoine Pitrou020436b2011-07-02 21:20:25 +0200199 executor = None
200
201 def shutting_down():
202 return _shutdown or executor is None or executor._shutdown_thread
Antoine Pitroudd696492011-06-08 17:21:55 +0200203
204 def shutdown_worker():
205 # This is an upper bound
206 nb_children_alive = sum(p.is_alive() for p in processes.values())
207 for i in range(0, nb_children_alive):
Antoine Pitrou1c405b32011-07-03 13:17:06 +0200208 call_queue.put_nowait(None)
Antoine Pitroudc19c242011-07-16 01:51:58 +0200209 # Release the queue's resources as soon as possible.
210 call_queue.close()
Antoine Pitroudd696492011-06-08 17:21:55 +0200211 # If .join() is not called on the created processes then
Antoine Pitrou020436b2011-07-02 21:20:25 +0200212 # some multiprocessing.Queue methods may deadlock on Mac OS X.
Antoine Pitroudd696492011-06-08 17:21:55 +0200213 for p in processes.values():
214 p.join()
215
Antoine Pitroubdb1cf12012-03-05 19:28:37 +0100216 reader = result_queue._reader
217
Brian Quinlan81c4d362010-09-18 22:35:02 +0000218 while True:
219 _add_call_item_to_queue(pending_work_items,
220 work_ids_queue,
221 call_queue)
222
Antoine Pitroudd696492011-06-08 17:21:55 +0200223 sentinels = [p.sentinel for p in processes.values()]
224 assert sentinels
Antoine Pitroubdb1cf12012-03-05 19:28:37 +0100225 ready = wait([reader] + sentinels)
226 if reader in ready:
227 result_item = reader.recv()
228 else:
Antoine Pitroudd696492011-06-08 17:21:55 +0200229 # Mark the process pool broken so that submits fail right now.
230 executor = executor_reference()
231 if executor is not None:
232 executor._broken = True
233 executor._shutdown_thread = True
Antoine Pitrou020436b2011-07-02 21:20:25 +0200234 executor = None
Antoine Pitroudd696492011-06-08 17:21:55 +0200235 # All futures in flight must be marked failed
236 for work_id, work_item in pending_work_items.items():
237 work_item.future.set_exception(
238 BrokenProcessPool(
239 "A process in the process pool was "
240 "terminated abruptly while the future was "
241 "running or pending."
242 ))
Andrew Svetlov6b973742012-11-03 15:36:01 +0200243 # Delete references to object. See issue16284
244 del work_item
Antoine Pitroudd696492011-06-08 17:21:55 +0200245 pending_work_items.clear()
246 # Terminate remaining workers forcibly: the queues or their
247 # locks may be in a dirty state and block forever.
248 for p in processes.values():
249 p.terminate()
Antoine Pitroudc19c242011-07-16 01:51:58 +0200250 shutdown_worker()
Antoine Pitroudd696492011-06-08 17:21:55 +0200251 return
252 if isinstance(result_item, int):
253 # Clean shutdown of a worker using its PID
254 # (avoids marking the executor broken)
Antoine Pitrou020436b2011-07-02 21:20:25 +0200255 assert shutting_down()
Antoine Pitroud06a0652011-07-16 01:13:34 +0200256 p = processes.pop(result_item)
257 p.join()
Antoine Pitrou020436b2011-07-02 21:20:25 +0200258 if not processes:
259 shutdown_worker()
260 return
Antoine Pitroudd696492011-06-08 17:21:55 +0200261 elif result_item is not None:
262 work_item = pending_work_items.pop(result_item.work_id, None)
263 # work_item can be None if another process terminated (see above)
264 if work_item is not None:
265 if result_item.exception:
266 work_item.future.set_exception(result_item.exception)
267 else:
268 work_item.future.set_result(result_item.result)
Andrew Svetlov6b973742012-11-03 15:36:01 +0200269 # Delete references to object. See issue16284
270 del work_item
Antoine Pitroudd696492011-06-08 17:21:55 +0200271 # Check whether we should start shutting down.
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100272 executor = executor_reference()
273 # No more work items can be added if:
274 # - The interpreter is shutting down OR
275 # - The executor that owns this worker has been collected OR
276 # - The executor that owns this worker has been shutdown.
Antoine Pitrou020436b2011-07-02 21:20:25 +0200277 if shutting_down():
Antoine Pitrou020436b2011-07-02 21:20:25 +0200278 try:
Antoine Pitrou1c405b32011-07-03 13:17:06 +0200279 # Since no new work items can be added, it is safe to shutdown
280 # this thread if there are no pending work items.
281 if not pending_work_items:
282 shutdown_worker()
283 return
Antoine Pitrou020436b2011-07-02 21:20:25 +0200284 except Full:
285 # This is not a problem: we will eventually be woken up (in
Antoine Pitrou1c405b32011-07-03 13:17:06 +0200286 # result_queue.get()) and be able to send a sentinel again.
Antoine Pitrou020436b2011-07-02 21:20:25 +0200287 pass
288 executor = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000289
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000290_system_limits_checked = False
291_system_limited = None
292def _check_system_limits():
293 global _system_limits_checked, _system_limited
294 if _system_limits_checked:
295 if _system_limited:
296 raise NotImplementedError(_system_limited)
297 _system_limits_checked = True
298 try:
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000299 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
300 except (AttributeError, ValueError):
301 # sysconf not available or setting not available
302 return
303 if nsems_max == -1:
304 # indetermine limit, assume that limit is determined
305 # by available memory only
306 return
307 if nsems_max >= 256:
308 # minimum number of semaphores available
309 # according to POSIX
310 return
311 _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
312 raise NotImplementedError(_system_limited)
313
Antoine Pitroudd696492011-06-08 17:21:55 +0200314
315class BrokenProcessPool(RuntimeError):
316 """
317 Raised when a process in a ProcessPoolExecutor terminated abruptly
318 while a future was in the running state.
319 """
320
321
Brian Quinlan81c4d362010-09-18 22:35:02 +0000322class ProcessPoolExecutor(_base.Executor):
323 def __init__(self, max_workers=None):
324 """Initializes a new ProcessPoolExecutor instance.
325
326 Args:
327 max_workers: The maximum number of processes that can be used to
328 execute the given calls. If None or not given then as many
329 worker processes will be created as the machine has processors.
330 """
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000331 _check_system_limits()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000332
333 if max_workers is None:
334 self._max_workers = multiprocessing.cpu_count()
335 else:
336 self._max_workers = max_workers
337
338 # Make the call queue slightly larger than the number of processes to
339 # prevent the worker processes from idling. But don't make it too big
340 # because futures in the call queue cannot be cancelled.
341 self._call_queue = multiprocessing.Queue(self._max_workers +
342 EXTRA_QUEUED_CALLS)
Antoine Pitroudc19c242011-07-16 01:51:58 +0200343 # Killed worker processes can produce spurious "broken pipe"
344 # tracebacks in the queue's own worker thread. But we detect killed
345 # processes anyway, so silence the tracebacks.
346 self._call_queue._ignore_epipe = True
Antoine Pitroub7877f22011-04-12 17:58:11 +0200347 self._result_queue = SimpleQueue()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000348 self._work_ids = queue.Queue()
349 self._queue_management_thread = None
Antoine Pitroudd696492011-06-08 17:21:55 +0200350 # Map of pids to processes
351 self._processes = {}
Brian Quinlan81c4d362010-09-18 22:35:02 +0000352
353 # Shutdown is a two-step process.
354 self._shutdown_thread = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000355 self._shutdown_lock = threading.Lock()
Antoine Pitroudd696492011-06-08 17:21:55 +0200356 self._broken = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000357 self._queue_count = 0
358 self._pending_work_items = {}
359
360 def _start_queue_management_thread(self):
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100361 # When the executor gets lost, the weakref callback will wake up
362 # the queue management thread.
363 def weakref_cb(_, q=self._result_queue):
364 q.put(None)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000365 if self._queue_management_thread is None:
Antoine Pitroudd696492011-06-08 17:21:55 +0200366 # Start the processes so that their sentinels are known.
367 self._adjust_process_count()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000368 self._queue_management_thread = threading.Thread(
Antoine Pitroub87a56a2011-05-03 16:34:42 +0200369 target=_queue_management_worker,
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100370 args=(weakref.ref(self, weakref_cb),
Brian Quinlan81c4d362010-09-18 22:35:02 +0000371 self._processes,
372 self._pending_work_items,
373 self._work_ids,
374 self._call_queue,
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200375 self._result_queue))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000376 self._queue_management_thread.daemon = True
377 self._queue_management_thread.start()
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100378 _threads_queues[self._queue_management_thread] = self._result_queue
Brian Quinlan81c4d362010-09-18 22:35:02 +0000379
380 def _adjust_process_count(self):
381 for _ in range(len(self._processes), self._max_workers):
382 p = multiprocessing.Process(
383 target=_process_worker,
384 args=(self._call_queue,
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200385 self._result_queue))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000386 p.start()
Antoine Pitroudd696492011-06-08 17:21:55 +0200387 self._processes[p.pid] = p
Brian Quinlan81c4d362010-09-18 22:35:02 +0000388
389 def submit(self, fn, *args, **kwargs):
390 with self._shutdown_lock:
Antoine Pitroudd696492011-06-08 17:21:55 +0200391 if self._broken:
392 raise BrokenProcessPool('A child process terminated '
393 'abruptly, the process pool is not usable anymore')
Brian Quinlan81c4d362010-09-18 22:35:02 +0000394 if self._shutdown_thread:
395 raise RuntimeError('cannot schedule new futures after shutdown')
396
397 f = _base.Future()
398 w = _WorkItem(f, fn, args, kwargs)
399
400 self._pending_work_items[self._queue_count] = w
401 self._work_ids.put(self._queue_count)
402 self._queue_count += 1
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100403 # Wake up queue management thread
404 self._result_queue.put(None)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000405
406 self._start_queue_management_thread()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000407 return f
408 submit.__doc__ = _base.Executor.submit.__doc__
409
410 def shutdown(self, wait=True):
411 with self._shutdown_lock:
412 self._shutdown_thread = True
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100413 if self._queue_management_thread:
414 # Wake up queue management thread
415 self._result_queue.put(None)
416 if wait:
Brian Quinlan81c4d362010-09-18 22:35:02 +0000417 self._queue_management_thread.join()
418 # To reduce the risk of openning too many files, remove references to
419 # objects that use file descriptors.
420 self._queue_management_thread = None
421 self._call_queue = None
422 self._result_queue = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000423 self._processes = None
424 shutdown.__doc__ = _base.Executor.shutdown.__doc__
425
426atexit.register(_python_exit)