blob: 79c60c3d105fcdb1c0a51a87b89351319485cfd9 [file] [log] [blame]
Brian Quinlan81c4d362010-09-18 22:35:02 +00001# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4"""Implements ProcessPoolExecutor.
5
6The follow diagram and text describe the data-flow through the system:
7
8|======================= In-process =====================|== Out-of-process ==|
9
10+----------+ +----------+ +--------+ +-----------+ +---------+
11| | => | Work Ids | => | | => | Call Q | => | |
12| | +----------+ | | +-----------+ | |
13| | | ... | | | | ... | | |
14| | | 6 | | | | 5, call() | | |
15| | | 7 | | | | ... | | |
16| Process | | ... | | Local | +-----------+ | Process |
17| Pool | +----------+ | Worker | | #1..n |
18| Executor | | Thread | | |
19| | +----------- + | | +-----------+ | |
20| | <=> | Work Items | <=> | | <= | Result Q | <= | |
21| | +------------+ | | +-----------+ | |
22| | | 6: call() | | | | ... | | |
23| | | future | | | | 4, result | | |
24| | | ... | | | | 3, except | | |
25+----------+ +------------+ +--------+ +-----------+ +---------+
26
27Executor.submit() called:
28- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29- adds the id of the _WorkItem to the "Work Ids" queue
30
31Local worker thread:
32- reads work ids from the "Work Ids" queue and looks up the corresponding
33 WorkItem from the "Work Items" dict: if the work item has been cancelled then
34 it is simply removed from the dict, otherwise it is repackaged as a
35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38- reads _ResultItems from "Result Q", updates the future stored in the
39 "Work Items" dict and deletes the dict entry
40
41Process #1..n:
42- reads _CallItems from "Call Q", executes the calls, and puts the resulting
43 _ResultItems in "Request Q"
44"""
45
46__author__ = 'Brian Quinlan (brian@sweetapp.com)'
47
48import atexit
49from concurrent.futures import _base
50import queue
51import multiprocessing
52import threading
53import weakref
54
55# Workers are created as daemon threads and processes. This is done to allow the
56# interpreter to exit when there are still idle processes in a
57# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
58# allowing workers to die with the interpreter has two undesirable properties:
59# - The workers would still be running during interpretor shutdown,
60# meaning that they would fail in unpredictable ways.
61# - The workers could be killed while evaluating a work item, which could
62# be bad if the callable being evaluated has external side-effects e.g.
63# writing to a file.
64#
65# To work around this problem, an exit handler is installed which tells the
66# workers to exit when their work queues are empty and then waits until the
67# threads/processes finish.
68
69_thread_references = set()
70_shutdown = False
71
72def _python_exit():
73 global _shutdown
74 _shutdown = True
75 for thread_reference in _thread_references:
76 thread = thread_reference()
77 if thread is not None:
78 thread.join()
79
80def _remove_dead_thread_references():
81 """Remove inactive threads from _thread_references.
82
83 Should be called periodically to prevent memory leaks in scenarios such as:
84 >>> while True:
85 >>> ... t = ThreadPoolExecutor(max_workers=5)
86 >>> ... t.map(int, ['1', '2', '3', '4', '5'])
87 """
88 for thread_reference in set(_thread_references):
89 if thread_reference() is None:
90 _thread_references.discard(thread_reference)
91
92# Controls how many more calls than processes will be queued in the call queue.
93# A smaller number will mean that processes spend more time idle waiting for
94# work while a larger number will make Future.cancel() succeed less frequently
95# (Futures in the call queue cannot be cancelled).
96EXTRA_QUEUED_CALLS = 1
97
98class _WorkItem(object):
99 def __init__(self, future, fn, args, kwargs):
100 self.future = future
101 self.fn = fn
102 self.args = args
103 self.kwargs = kwargs
104
105class _ResultItem(object):
106 def __init__(self, work_id, exception=None, result=None):
107 self.work_id = work_id
108 self.exception = exception
109 self.result = result
110
111class _CallItem(object):
112 def __init__(self, work_id, fn, args, kwargs):
113 self.work_id = work_id
114 self.fn = fn
115 self.args = args
116 self.kwargs = kwargs
117
118def _process_worker(call_queue, result_queue, shutdown):
119 """Evaluates calls from call_queue and places the results in result_queue.
120
Georg Brandlfb1720b2010-12-09 18:08:43 +0000121 This worker is run in a separate process.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000122
123 Args:
124 call_queue: A multiprocessing.Queue of _CallItems that will be read and
125 evaluated by the worker.
126 result_queue: A multiprocessing.Queue of _ResultItems that will written
127 to by the worker.
128 shutdown: A multiprocessing.Event that will be set as a signal to the
129 worker that it should exit when call_queue is empty.
130 """
131 while True:
132 try:
133 call_item = call_queue.get(block=True, timeout=0.1)
134 except queue.Empty:
135 if shutdown.is_set():
136 return
137 else:
138 try:
139 r = call_item.fn(*call_item.args, **call_item.kwargs)
140 except BaseException as e:
141 result_queue.put(_ResultItem(call_item.work_id,
142 exception=e))
143 else:
144 result_queue.put(_ResultItem(call_item.work_id,
145 result=r))
146
147def _add_call_item_to_queue(pending_work_items,
148 work_ids,
149 call_queue):
150 """Fills call_queue with _WorkItems from pending_work_items.
151
152 This function never blocks.
153
154 Args:
155 pending_work_items: A dict mapping work ids to _WorkItems e.g.
156 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
157 work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
158 are consumed and the corresponding _WorkItems from
159 pending_work_items are transformed into _CallItems and put in
160 call_queue.
161 call_queue: A multiprocessing.Queue that will be filled with _CallItems
162 derived from _WorkItems.
163 """
164 while True:
165 if call_queue.full():
166 return
167 try:
168 work_id = work_ids.get(block=False)
169 except queue.Empty:
170 return
171 else:
172 work_item = pending_work_items[work_id]
173
174 if work_item.future.set_running_or_notify_cancel():
175 call_queue.put(_CallItem(work_id,
176 work_item.fn,
177 work_item.args,
178 work_item.kwargs),
179 block=True)
180 else:
181 del pending_work_items[work_id]
182 continue
183
184def _queue_manangement_worker(executor_reference,
185 processes,
186 pending_work_items,
187 work_ids_queue,
188 call_queue,
189 result_queue,
190 shutdown_process_event):
191 """Manages the communication between this process and the worker processes.
192
193 This function is run in a local thread.
194
195 Args:
196 executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
197 this thread. Used to determine if the ProcessPoolExecutor has been
198 garbage collected and that this function can exit.
199 process: A list of the multiprocessing.Process instances used as
200 workers.
201 pending_work_items: A dict mapping work ids to _WorkItems e.g.
202 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
203 work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
204 call_queue: A multiprocessing.Queue that will be filled with _CallItems
205 derived from _WorkItems for processing by the process workers.
206 result_queue: A multiprocessing.Queue of _ResultItems generated by the
207 process workers.
208 shutdown_process_event: A multiprocessing.Event used to signal the
209 process workers that they should exit when their work queue is
210 empty.
211 """
212 while True:
213 _add_call_item_to_queue(pending_work_items,
214 work_ids_queue,
215 call_queue)
216
217 try:
218 result_item = result_queue.get(block=True, timeout=0.1)
219 except queue.Empty:
220 executor = executor_reference()
221 # No more work items can be added if:
222 # - The interpreter is shutting down OR
223 # - The executor that owns this worker has been collected OR
224 # - The executor that owns this worker has been shutdown.
225 if _shutdown or executor is None or executor._shutdown_thread:
226 # Since no new work items can be added, it is safe to shutdown
227 # this thread if there are no pending work items.
228 if not pending_work_items:
229 shutdown_process_event.set()
230
231 # If .join() is not called on the created processes then
232 # some multiprocessing.Queue methods may deadlock on Mac OS
233 # X.
234 for p in processes:
235 p.join()
236 return
237 del executor
238 else:
239 work_item = pending_work_items[result_item.work_id]
240 del pending_work_items[result_item.work_id]
241
242 if result_item.exception:
243 work_item.future.set_exception(result_item.exception)
244 else:
245 work_item.future.set_result(result_item.result)
246
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000247_system_limits_checked = False
248_system_limited = None
249def _check_system_limits():
250 global _system_limits_checked, _system_limited
251 if _system_limits_checked:
252 if _system_limited:
253 raise NotImplementedError(_system_limited)
254 _system_limits_checked = True
255 try:
256 import os
257 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
258 except (AttributeError, ValueError):
259 # sysconf not available or setting not available
260 return
261 if nsems_max == -1:
262 # indetermine limit, assume that limit is determined
263 # by available memory only
264 return
265 if nsems_max >= 256:
266 # minimum number of semaphores available
267 # according to POSIX
268 return
269 _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
270 raise NotImplementedError(_system_limited)
271
Brian Quinlan81c4d362010-09-18 22:35:02 +0000272class ProcessPoolExecutor(_base.Executor):
273 def __init__(self, max_workers=None):
274 """Initializes a new ProcessPoolExecutor instance.
275
276 Args:
277 max_workers: The maximum number of processes that can be used to
278 execute the given calls. If None or not given then as many
279 worker processes will be created as the machine has processors.
280 """
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000281 _check_system_limits()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000282 _remove_dead_thread_references()
283
284 if max_workers is None:
285 self._max_workers = multiprocessing.cpu_count()
286 else:
287 self._max_workers = max_workers
288
289 # Make the call queue slightly larger than the number of processes to
290 # prevent the worker processes from idling. But don't make it too big
291 # because futures in the call queue cannot be cancelled.
292 self._call_queue = multiprocessing.Queue(self._max_workers +
293 EXTRA_QUEUED_CALLS)
294 self._result_queue = multiprocessing.Queue()
295 self._work_ids = queue.Queue()
296 self._queue_management_thread = None
297 self._processes = set()
298
299 # Shutdown is a two-step process.
300 self._shutdown_thread = False
301 self._shutdown_process_event = multiprocessing.Event()
302 self._shutdown_lock = threading.Lock()
303 self._queue_count = 0
304 self._pending_work_items = {}
305
306 def _start_queue_management_thread(self):
307 if self._queue_management_thread is None:
308 self._queue_management_thread = threading.Thread(
309 target=_queue_manangement_worker,
310 args=(weakref.ref(self),
311 self._processes,
312 self._pending_work_items,
313 self._work_ids,
314 self._call_queue,
315 self._result_queue,
316 self._shutdown_process_event))
317 self._queue_management_thread.daemon = True
318 self._queue_management_thread.start()
319 _thread_references.add(weakref.ref(self._queue_management_thread))
320
321 def _adjust_process_count(self):
322 for _ in range(len(self._processes), self._max_workers):
323 p = multiprocessing.Process(
324 target=_process_worker,
325 args=(self._call_queue,
326 self._result_queue,
327 self._shutdown_process_event))
328 p.start()
329 self._processes.add(p)
330
331 def submit(self, fn, *args, **kwargs):
332 with self._shutdown_lock:
333 if self._shutdown_thread:
334 raise RuntimeError('cannot schedule new futures after shutdown')
335
336 f = _base.Future()
337 w = _WorkItem(f, fn, args, kwargs)
338
339 self._pending_work_items[self._queue_count] = w
340 self._work_ids.put(self._queue_count)
341 self._queue_count += 1
342
343 self._start_queue_management_thread()
344 self._adjust_process_count()
345 return f
346 submit.__doc__ = _base.Executor.submit.__doc__
347
348 def shutdown(self, wait=True):
349 with self._shutdown_lock:
350 self._shutdown_thread = True
351 if wait:
352 if self._queue_management_thread:
353 self._queue_management_thread.join()
354 # To reduce the risk of openning too many files, remove references to
355 # objects that use file descriptors.
356 self._queue_management_thread = None
357 self._call_queue = None
358 self._result_queue = None
359 self._shutdown_process_event = None
360 self._processes = None
361 shutdown.__doc__ = _base.Executor.shutdown.__doc__
362
363atexit.register(_python_exit)