blob: d3bbe2c5e68f5534bb47b97f075c3350e8eee4d8 [file] [log] [blame]
Brian Quinlan81c4d362010-09-18 22:35:02 +00001# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4"""Implements ProcessPoolExecutor.
5
6The follow diagram and text describe the data-flow through the system:
7
8|======================= In-process =====================|== Out-of-process ==|
9
10+----------+ +----------+ +--------+ +-----------+ +---------+
11| | => | Work Ids | => | | => | Call Q | => | |
12| | +----------+ | | +-----------+ | |
13| | | ... | | | | ... | | |
14| | | 6 | | | | 5, call() | | |
15| | | 7 | | | | ... | | |
16| Process | | ... | | Local | +-----------+ | Process |
17| Pool | +----------+ | Worker | | #1..n |
18| Executor | | Thread | | |
19| | +----------- + | | +-----------+ | |
20| | <=> | Work Items | <=> | | <= | Result Q | <= | |
21| | +------------+ | | +-----------+ | |
22| | | 6: call() | | | | ... | | |
23| | | future | | | | 4, result | | |
24| | | ... | | | | 3, except | | |
25+----------+ +------------+ +--------+ +-----------+ +---------+
26
27Executor.submit() called:
28- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29- adds the id of the _WorkItem to the "Work Ids" queue
30
31Local worker thread:
32- reads work ids from the "Work Ids" queue and looks up the corresponding
33 WorkItem from the "Work Items" dict: if the work item has been cancelled then
34 it is simply removed from the dict, otherwise it is repackaged as a
35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38- reads _ResultItems from "Result Q", updates the future stored in the
39 "Work Items" dict and deletes the dict entry
40
41Process #1..n:
42- reads _CallItems from "Call Q", executes the calls, and puts the resulting
43 _ResultItems in "Request Q"
44"""
45
46__author__ = 'Brian Quinlan (brian@sweetapp.com)'
47
48import atexit
49from concurrent.futures import _base
50import queue
51import multiprocessing
52import threading
53import weakref
54
55# Workers are created as daemon threads and processes. This is done to allow the
56# interpreter to exit when there are still idle processes in a
57# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
58# allowing workers to die with the interpreter has two undesirable properties:
59# - The workers would still be running during interpretor shutdown,
60# meaning that they would fail in unpredictable ways.
61# - The workers could be killed while evaluating a work item, which could
62# be bad if the callable being evaluated has external side-effects e.g.
63# writing to a file.
64#
65# To work around this problem, an exit handler is installed which tells the
66# workers to exit when their work queues are empty and then waits until the
67# threads/processes finish.
68
Antoine Pitrouc13d4542011-03-26 19:29:44 +010069_threads_queues = weakref.WeakKeyDictionary()
Brian Quinlan81c4d362010-09-18 22:35:02 +000070_shutdown = False
71
72def _python_exit():
73 global _shutdown
74 _shutdown = True
Antoine Pitrouc13d4542011-03-26 19:29:44 +010075 items = list(_threads_queues.items())
76 for t, q in items:
77 q.put(None)
78 for t, q in items:
79 t.join()
Brian Quinlan81c4d362010-09-18 22:35:02 +000080
81# Controls how many more calls than processes will be queued in the call queue.
82# A smaller number will mean that processes spend more time idle waiting for
83# work while a larger number will make Future.cancel() succeed less frequently
84# (Futures in the call queue cannot be cancelled).
85EXTRA_QUEUED_CALLS = 1
86
87class _WorkItem(object):
88 def __init__(self, future, fn, args, kwargs):
89 self.future = future
90 self.fn = fn
91 self.args = args
92 self.kwargs = kwargs
93
94class _ResultItem(object):
95 def __init__(self, work_id, exception=None, result=None):
96 self.work_id = work_id
97 self.exception = exception
98 self.result = result
99
100class _CallItem(object):
101 def __init__(self, work_id, fn, args, kwargs):
102 self.work_id = work_id
103 self.fn = fn
104 self.args = args
105 self.kwargs = kwargs
106
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200107def _process_worker(call_queue, result_queue):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000108 """Evaluates calls from call_queue and places the results in result_queue.
109
Georg Brandlfb1720b2010-12-09 18:08:43 +0000110 This worker is run in a separate process.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000111
112 Args:
113 call_queue: A multiprocessing.Queue of _CallItems that will be read and
114 evaluated by the worker.
115 result_queue: A multiprocessing.Queue of _ResultItems that will written
116 to by the worker.
117 shutdown: A multiprocessing.Event that will be set as a signal to the
118 worker that it should exit when call_queue is empty.
119 """
120 while True:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200121 call_item = call_queue.get(block=True)
122 if call_item is None:
123 # Wake up queue management thread
124 result_queue.put(None)
125 return
Brian Quinlan81c4d362010-09-18 22:35:02 +0000126 try:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200127 r = call_item.fn(*call_item.args, **call_item.kwargs)
128 except BaseException as e:
129 result_queue.put(_ResultItem(call_item.work_id,
130 exception=e))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000131 else:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200132 result_queue.put(_ResultItem(call_item.work_id,
133 result=r))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000134
135def _add_call_item_to_queue(pending_work_items,
136 work_ids,
137 call_queue):
138 """Fills call_queue with _WorkItems from pending_work_items.
139
140 This function never blocks.
141
142 Args:
143 pending_work_items: A dict mapping work ids to _WorkItems e.g.
144 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
145 work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
146 are consumed and the corresponding _WorkItems from
147 pending_work_items are transformed into _CallItems and put in
148 call_queue.
149 call_queue: A multiprocessing.Queue that will be filled with _CallItems
150 derived from _WorkItems.
151 """
152 while True:
153 if call_queue.full():
154 return
155 try:
156 work_id = work_ids.get(block=False)
157 except queue.Empty:
158 return
159 else:
160 work_item = pending_work_items[work_id]
161
162 if work_item.future.set_running_or_notify_cancel():
163 call_queue.put(_CallItem(work_id,
164 work_item.fn,
165 work_item.args,
166 work_item.kwargs),
167 block=True)
168 else:
169 del pending_work_items[work_id]
170 continue
171
Antoine Pitroub87a56a2011-05-03 16:34:42 +0200172def _queue_management_worker(executor_reference,
173 processes,
174 pending_work_items,
175 work_ids_queue,
176 call_queue,
177 result_queue):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000178 """Manages the communication between this process and the worker processes.
179
180 This function is run in a local thread.
181
182 Args:
183 executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
184 this thread. Used to determine if the ProcessPoolExecutor has been
185 garbage collected and that this function can exit.
186 process: A list of the multiprocessing.Process instances used as
187 workers.
188 pending_work_items: A dict mapping work ids to _WorkItems e.g.
189 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
190 work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
191 call_queue: A multiprocessing.Queue that will be filled with _CallItems
192 derived from _WorkItems for processing by the process workers.
193 result_queue: A multiprocessing.Queue of _ResultItems generated by the
194 process workers.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000195 """
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100196 nb_shutdown_processes = 0
197 def shutdown_one_process():
198 """Tell a worker to terminate, which will in turn wake us again"""
199 nonlocal nb_shutdown_processes
200 call_queue.put(None)
201 nb_shutdown_processes += 1
Brian Quinlan81c4d362010-09-18 22:35:02 +0000202 while True:
203 _add_call_item_to_queue(pending_work_items,
204 work_ids_queue,
205 call_queue)
206
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200207 result_item = result_queue.get(block=True)
208 if result_item is not None:
209 work_item = pending_work_items[result_item.work_id]
210 del pending_work_items[result_item.work_id]
Brian Quinlan81c4d362010-09-18 22:35:02 +0000211
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200212 if result_item.exception:
213 work_item.future.set_exception(result_item.exception)
214 else:
215 work_item.future.set_result(result_item.result)
Ross Lagerwall66e2fb62012-01-08 08:29:40 +0200216 # Check whether we should start shutting down.
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100217 executor = executor_reference()
218 # No more work items can be added if:
219 # - The interpreter is shutting down OR
220 # - The executor that owns this worker has been collected OR
221 # - The executor that owns this worker has been shutdown.
222 if _shutdown or executor is None or executor._shutdown_thread:
223 # Since no new work items can be added, it is safe to shutdown
224 # this thread if there are no pending work items.
225 if not pending_work_items:
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100226 while nb_shutdown_processes < len(processes):
227 shutdown_one_process()
228 # If .join() is not called on the created processes then
229 # some multiprocessing.Queue methods may deadlock on Mac OS
230 # X.
231 for p in processes:
232 p.join()
Antoine Pitrou0b447952011-07-16 23:52:02 +0200233 call_queue.close()
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100234 return
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100235 del executor
Brian Quinlan81c4d362010-09-18 22:35:02 +0000236
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000237_system_limits_checked = False
238_system_limited = None
239def _check_system_limits():
240 global _system_limits_checked, _system_limited
241 if _system_limits_checked:
242 if _system_limited:
243 raise NotImplementedError(_system_limited)
244 _system_limits_checked = True
245 try:
246 import os
247 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
248 except (AttributeError, ValueError):
249 # sysconf not available or setting not available
250 return
251 if nsems_max == -1:
252 # indetermine limit, assume that limit is determined
253 # by available memory only
254 return
255 if nsems_max >= 256:
256 # minimum number of semaphores available
257 # according to POSIX
258 return
259 _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
260 raise NotImplementedError(_system_limited)
261
Brian Quinlan81c4d362010-09-18 22:35:02 +0000262class ProcessPoolExecutor(_base.Executor):
263 def __init__(self, max_workers=None):
264 """Initializes a new ProcessPoolExecutor instance.
265
266 Args:
267 max_workers: The maximum number of processes that can be used to
268 execute the given calls. If None or not given then as many
269 worker processes will be created as the machine has processors.
270 """
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000271 _check_system_limits()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000272
273 if max_workers is None:
274 self._max_workers = multiprocessing.cpu_count()
275 else:
276 self._max_workers = max_workers
277
278 # Make the call queue slightly larger than the number of processes to
279 # prevent the worker processes from idling. But don't make it too big
280 # because futures in the call queue cannot be cancelled.
281 self._call_queue = multiprocessing.Queue(self._max_workers +
282 EXTRA_QUEUED_CALLS)
283 self._result_queue = multiprocessing.Queue()
284 self._work_ids = queue.Queue()
285 self._queue_management_thread = None
286 self._processes = set()
287
288 # Shutdown is a two-step process.
289 self._shutdown_thread = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000290 self._shutdown_lock = threading.Lock()
291 self._queue_count = 0
292 self._pending_work_items = {}
293
294 def _start_queue_management_thread(self):
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100295 # When the executor gets lost, the weakref callback will wake up
296 # the queue management thread.
297 def weakref_cb(_, q=self._result_queue):
298 q.put(None)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000299 if self._queue_management_thread is None:
300 self._queue_management_thread = threading.Thread(
Antoine Pitroub87a56a2011-05-03 16:34:42 +0200301 target=_queue_management_worker,
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100302 args=(weakref.ref(self, weakref_cb),
Brian Quinlan81c4d362010-09-18 22:35:02 +0000303 self._processes,
304 self._pending_work_items,
305 self._work_ids,
306 self._call_queue,
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200307 self._result_queue))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000308 self._queue_management_thread.daemon = True
309 self._queue_management_thread.start()
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100310 _threads_queues[self._queue_management_thread] = self._result_queue
Brian Quinlan81c4d362010-09-18 22:35:02 +0000311
312 def _adjust_process_count(self):
313 for _ in range(len(self._processes), self._max_workers):
314 p = multiprocessing.Process(
315 target=_process_worker,
316 args=(self._call_queue,
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200317 self._result_queue))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000318 p.start()
319 self._processes.add(p)
320
321 def submit(self, fn, *args, **kwargs):
322 with self._shutdown_lock:
323 if self._shutdown_thread:
324 raise RuntimeError('cannot schedule new futures after shutdown')
325
326 f = _base.Future()
327 w = _WorkItem(f, fn, args, kwargs)
328
329 self._pending_work_items[self._queue_count] = w
330 self._work_ids.put(self._queue_count)
331 self._queue_count += 1
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100332 # Wake up queue management thread
333 self._result_queue.put(None)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000334
335 self._start_queue_management_thread()
336 self._adjust_process_count()
337 return f
338 submit.__doc__ = _base.Executor.submit.__doc__
339
340 def shutdown(self, wait=True):
341 with self._shutdown_lock:
342 self._shutdown_thread = True
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100343 if self._queue_management_thread:
344 # Wake up queue management thread
345 self._result_queue.put(None)
346 if wait:
Brian Quinlan81c4d362010-09-18 22:35:02 +0000347 self._queue_management_thread.join()
348 # To reduce the risk of openning too many files, remove references to
349 # objects that use file descriptors.
350 self._queue_management_thread = None
351 self._call_queue = None
352 self._result_queue = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000353 self._processes = None
354 shutdown.__doc__ = _base.Executor.shutdown.__doc__
355
356atexit.register(_python_exit)