blob: f461b7777dbbb76a78beb3fb61a8414e0f42d418 [file] [log] [blame]
Brian Quinlan81c4d362010-09-18 22:35:02 +00001# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4"""Implements ProcessPoolExecutor.
5
6The follow diagram and text describe the data-flow through the system:
7
8|======================= In-process =====================|== Out-of-process ==|
9
10+----------+ +----------+ +--------+ +-----------+ +---------+
11| | => | Work Ids | => | | => | Call Q | => | |
12| | +----------+ | | +-----------+ | |
13| | | ... | | | | ... | | |
14| | | 6 | | | | 5, call() | | |
15| | | 7 | | | | ... | | |
16| Process | | ... | | Local | +-----------+ | Process |
17| Pool | +----------+ | Worker | | #1..n |
18| Executor | | Thread | | |
19| | +----------- + | | +-----------+ | |
20| | <=> | Work Items | <=> | | <= | Result Q | <= | |
21| | +------------+ | | +-----------+ | |
22| | | 6: call() | | | | ... | | |
23| | | future | | | | 4, result | | |
24| | | ... | | | | 3, except | | |
25+----------+ +------------+ +--------+ +-----------+ +---------+
26
27Executor.submit() called:
28- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29- adds the id of the _WorkItem to the "Work Ids" queue
30
31Local worker thread:
32- reads work ids from the "Work Ids" queue and looks up the corresponding
33 WorkItem from the "Work Items" dict: if the work item has been cancelled then
34 it is simply removed from the dict, otherwise it is repackaged as a
35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38- reads _ResultItems from "Result Q", updates the future stored in the
39 "Work Items" dict and deletes the dict entry
40
41Process #1..n:
42- reads _CallItems from "Call Q", executes the calls, and puts the resulting
43 _ResultItems in "Request Q"
44"""
45
46__author__ = 'Brian Quinlan (brian@sweetapp.com)'
47
48import atexit
49from concurrent.futures import _base
50import queue
51import multiprocessing
52import threading
53import weakref
54
55# Workers are created as daemon threads and processes. This is done to allow the
56# interpreter to exit when there are still idle processes in a
57# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
58# allowing workers to die with the interpreter has two undesirable properties:
59# - The workers would still be running during interpretor shutdown,
60# meaning that they would fail in unpredictable ways.
61# - The workers could be killed while evaluating a work item, which could
62# be bad if the callable being evaluated has external side-effects e.g.
63# writing to a file.
64#
65# To work around this problem, an exit handler is installed which tells the
66# workers to exit when their work queues are empty and then waits until the
67# threads/processes finish.
68
69_thread_references = set()
70_shutdown = False
71
72def _python_exit():
73 global _shutdown
74 _shutdown = True
75 for thread_reference in _thread_references:
76 thread = thread_reference()
77 if thread is not None:
78 thread.join()
79
80def _remove_dead_thread_references():
81 """Remove inactive threads from _thread_references.
82
83 Should be called periodically to prevent memory leaks in scenarios such as:
84 >>> while True:
85 >>> ... t = ThreadPoolExecutor(max_workers=5)
86 >>> ... t.map(int, ['1', '2', '3', '4', '5'])
87 """
88 for thread_reference in set(_thread_references):
89 if thread_reference() is None:
90 _thread_references.discard(thread_reference)
91
92# Controls how many more calls than processes will be queued in the call queue.
93# A smaller number will mean that processes spend more time idle waiting for
94# work while a larger number will make Future.cancel() succeed less frequently
95# (Futures in the call queue cannot be cancelled).
96EXTRA_QUEUED_CALLS = 1
97
98class _WorkItem(object):
99 def __init__(self, future, fn, args, kwargs):
100 self.future = future
101 self.fn = fn
102 self.args = args
103 self.kwargs = kwargs
104
105class _ResultItem(object):
106 def __init__(self, work_id, exception=None, result=None):
107 self.work_id = work_id
108 self.exception = exception
109 self.result = result
110
111class _CallItem(object):
112 def __init__(self, work_id, fn, args, kwargs):
113 self.work_id = work_id
114 self.fn = fn
115 self.args = args
116 self.kwargs = kwargs
117
118def _process_worker(call_queue, result_queue, shutdown):
119 """Evaluates calls from call_queue and places the results in result_queue.
120
Georg Brandlfb1720b2010-12-09 18:08:43 +0000121 This worker is run in a separate process.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000122
123 Args:
124 call_queue: A multiprocessing.Queue of _CallItems that will be read and
125 evaluated by the worker.
126 result_queue: A multiprocessing.Queue of _ResultItems that will written
127 to by the worker.
128 shutdown: A multiprocessing.Event that will be set as a signal to the
129 worker that it should exit when call_queue is empty.
130 """
131 while True:
132 try:
133 call_item = call_queue.get(block=True, timeout=0.1)
134 except queue.Empty:
135 if shutdown.is_set():
136 return
137 else:
138 try:
139 r = call_item.fn(*call_item.args, **call_item.kwargs)
140 except BaseException as e:
141 result_queue.put(_ResultItem(call_item.work_id,
142 exception=e))
143 else:
144 result_queue.put(_ResultItem(call_item.work_id,
145 result=r))
146
147def _add_call_item_to_queue(pending_work_items,
148 work_ids,
149 call_queue):
150 """Fills call_queue with _WorkItems from pending_work_items.
151
152 This function never blocks.
153
154 Args:
155 pending_work_items: A dict mapping work ids to _WorkItems e.g.
156 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
157 work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
158 are consumed and the corresponding _WorkItems from
159 pending_work_items are transformed into _CallItems and put in
160 call_queue.
161 call_queue: A multiprocessing.Queue that will be filled with _CallItems
162 derived from _WorkItems.
163 """
164 while True:
165 if call_queue.full():
166 return
167 try:
168 work_id = work_ids.get(block=False)
169 except queue.Empty:
170 return
171 else:
172 work_item = pending_work_items[work_id]
173
174 if work_item.future.set_running_or_notify_cancel():
175 call_queue.put(_CallItem(work_id,
176 work_item.fn,
177 work_item.args,
178 work_item.kwargs),
179 block=True)
180 else:
181 del pending_work_items[work_id]
182 continue
183
184def _queue_manangement_worker(executor_reference,
185 processes,
186 pending_work_items,
187 work_ids_queue,
188 call_queue,
189 result_queue,
190 shutdown_process_event):
191 """Manages the communication between this process and the worker processes.
192
193 This function is run in a local thread.
194
195 Args:
196 executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
197 this thread. Used to determine if the ProcessPoolExecutor has been
198 garbage collected and that this function can exit.
199 process: A list of the multiprocessing.Process instances used as
200 workers.
201 pending_work_items: A dict mapping work ids to _WorkItems e.g.
202 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
203 work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
204 call_queue: A multiprocessing.Queue that will be filled with _CallItems
205 derived from _WorkItems for processing by the process workers.
206 result_queue: A multiprocessing.Queue of _ResultItems generated by the
207 process workers.
208 shutdown_process_event: A multiprocessing.Event used to signal the
209 process workers that they should exit when their work queue is
210 empty.
211 """
212 while True:
213 _add_call_item_to_queue(pending_work_items,
214 work_ids_queue,
215 call_queue)
216
217 try:
218 result_item = result_queue.get(block=True, timeout=0.1)
219 except queue.Empty:
220 executor = executor_reference()
221 # No more work items can be added if:
222 # - The interpreter is shutting down OR
223 # - The executor that owns this worker has been collected OR
224 # - The executor that owns this worker has been shutdown.
225 if _shutdown or executor is None or executor._shutdown_thread:
226 # Since no new work items can be added, it is safe to shutdown
227 # this thread if there are no pending work items.
228 if not pending_work_items:
229 shutdown_process_event.set()
230
231 # If .join() is not called on the created processes then
232 # some multiprocessing.Queue methods may deadlock on Mac OS
233 # X.
234 for p in processes:
235 p.join()
236 return
237 del executor
238 else:
239 work_item = pending_work_items[result_item.work_id]
240 del pending_work_items[result_item.work_id]
241
242 if result_item.exception:
243 work_item.future.set_exception(result_item.exception)
244 else:
245 work_item.future.set_result(result_item.result)
246
247class ProcessPoolExecutor(_base.Executor):
248 def __init__(self, max_workers=None):
249 """Initializes a new ProcessPoolExecutor instance.
250
251 Args:
252 max_workers: The maximum number of processes that can be used to
253 execute the given calls. If None or not given then as many
254 worker processes will be created as the machine has processors.
255 """
256 _remove_dead_thread_references()
257
258 if max_workers is None:
259 self._max_workers = multiprocessing.cpu_count()
260 else:
261 self._max_workers = max_workers
262
263 # Make the call queue slightly larger than the number of processes to
264 # prevent the worker processes from idling. But don't make it too big
265 # because futures in the call queue cannot be cancelled.
266 self._call_queue = multiprocessing.Queue(self._max_workers +
267 EXTRA_QUEUED_CALLS)
268 self._result_queue = multiprocessing.Queue()
269 self._work_ids = queue.Queue()
270 self._queue_management_thread = None
271 self._processes = set()
272
273 # Shutdown is a two-step process.
274 self._shutdown_thread = False
275 self._shutdown_process_event = multiprocessing.Event()
276 self._shutdown_lock = threading.Lock()
277 self._queue_count = 0
278 self._pending_work_items = {}
279
280 def _start_queue_management_thread(self):
281 if self._queue_management_thread is None:
282 self._queue_management_thread = threading.Thread(
283 target=_queue_manangement_worker,
284 args=(weakref.ref(self),
285 self._processes,
286 self._pending_work_items,
287 self._work_ids,
288 self._call_queue,
289 self._result_queue,
290 self._shutdown_process_event))
291 self._queue_management_thread.daemon = True
292 self._queue_management_thread.start()
293 _thread_references.add(weakref.ref(self._queue_management_thread))
294
295 def _adjust_process_count(self):
296 for _ in range(len(self._processes), self._max_workers):
297 p = multiprocessing.Process(
298 target=_process_worker,
299 args=(self._call_queue,
300 self._result_queue,
301 self._shutdown_process_event))
302 p.start()
303 self._processes.add(p)
304
305 def submit(self, fn, *args, **kwargs):
306 with self._shutdown_lock:
307 if self._shutdown_thread:
308 raise RuntimeError('cannot schedule new futures after shutdown')
309
310 f = _base.Future()
311 w = _WorkItem(f, fn, args, kwargs)
312
313 self._pending_work_items[self._queue_count] = w
314 self._work_ids.put(self._queue_count)
315 self._queue_count += 1
316
317 self._start_queue_management_thread()
318 self._adjust_process_count()
319 return f
320 submit.__doc__ = _base.Executor.submit.__doc__
321
322 def shutdown(self, wait=True):
323 with self._shutdown_lock:
324 self._shutdown_thread = True
325 if wait:
326 if self._queue_management_thread:
327 self._queue_management_thread.join()
328 # To reduce the risk of openning too many files, remove references to
329 # objects that use file descriptors.
330 self._queue_management_thread = None
331 self._call_queue = None
332 self._result_queue = None
333 self._shutdown_process_event = None
334 self._processes = None
335 shutdown.__doc__ = _base.Executor.shutdown.__doc__
336
337atexit.register(_python_exit)