blob: a899d5fca87937b34b2921215bdfe62ddf350775 [file] [log] [blame]
Brian Quinlan81c4d362010-09-18 22:35:02 +00001# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4"""Implements ProcessPoolExecutor.
5
6The follow diagram and text describe the data-flow through the system:
7
8|======================= In-process =====================|== Out-of-process ==|
9
10+----------+ +----------+ +--------+ +-----------+ +---------+
11| | => | Work Ids | => | | => | Call Q | => | |
12| | +----------+ | | +-----------+ | |
13| | | ... | | | | ... | | |
14| | | 6 | | | | 5, call() | | |
15| | | 7 | | | | ... | | |
16| Process | | ... | | Local | +-----------+ | Process |
17| Pool | +----------+ | Worker | | #1..n |
18| Executor | | Thread | | |
19| | +----------- + | | +-----------+ | |
20| | <=> | Work Items | <=> | | <= | Result Q | <= | |
21| | +------------+ | | +-----------+ | |
22| | | 6: call() | | | | ... | | |
23| | | future | | | | 4, result | | |
24| | | ... | | | | 3, except | | |
25+----------+ +------------+ +--------+ +-----------+ +---------+
26
27Executor.submit() called:
28- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29- adds the id of the _WorkItem to the "Work Ids" queue
30
31Local worker thread:
32- reads work ids from the "Work Ids" queue and looks up the corresponding
33 WorkItem from the "Work Items" dict: if the work item has been cancelled then
34 it is simply removed from the dict, otherwise it is repackaged as a
35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38- reads _ResultItems from "Result Q", updates the future stored in the
39 "Work Items" dict and deletes the dict entry
40
41Process #1..n:
42- reads _CallItems from "Call Q", executes the calls, and puts the resulting
43 _ResultItems in "Request Q"
44"""
45
46__author__ = 'Brian Quinlan (brian@sweetapp.com)'
47
48import atexit
49from concurrent.futures import _base
50import queue
51import multiprocessing
52import threading
53import weakref
54
55# Workers are created as daemon threads and processes. This is done to allow the
56# interpreter to exit when there are still idle processes in a
57# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
58# allowing workers to die with the interpreter has two undesirable properties:
59# - The workers would still be running during interpretor shutdown,
60# meaning that they would fail in unpredictable ways.
61# - The workers could be killed while evaluating a work item, which could
62# be bad if the callable being evaluated has external side-effects e.g.
63# writing to a file.
64#
65# To work around this problem, an exit handler is installed which tells the
66# workers to exit when their work queues are empty and then waits until the
67# threads/processes finish.
68
Antoine Pitrouc13d4542011-03-26 19:29:44 +010069_threads_queues = weakref.WeakKeyDictionary()
Brian Quinlan81c4d362010-09-18 22:35:02 +000070_shutdown = False
71
72def _python_exit():
73 global _shutdown
74 _shutdown = True
Antoine Pitrouc13d4542011-03-26 19:29:44 +010075 items = list(_threads_queues.items())
76 for t, q in items:
77 q.put(None)
78 for t, q in items:
79 t.join()
Brian Quinlan81c4d362010-09-18 22:35:02 +000080
81# Controls how many more calls than processes will be queued in the call queue.
82# A smaller number will mean that processes spend more time idle waiting for
83# work while a larger number will make Future.cancel() succeed less frequently
84# (Futures in the call queue cannot be cancelled).
85EXTRA_QUEUED_CALLS = 1
86
87class _WorkItem(object):
88 def __init__(self, future, fn, args, kwargs):
89 self.future = future
90 self.fn = fn
91 self.args = args
92 self.kwargs = kwargs
93
94class _ResultItem(object):
95 def __init__(self, work_id, exception=None, result=None):
96 self.work_id = work_id
97 self.exception = exception
98 self.result = result
99
100class _CallItem(object):
101 def __init__(self, work_id, fn, args, kwargs):
102 self.work_id = work_id
103 self.fn = fn
104 self.args = args
105 self.kwargs = kwargs
106
107def _process_worker(call_queue, result_queue, shutdown):
108 """Evaluates calls from call_queue and places the results in result_queue.
109
Georg Brandlfb1720b2010-12-09 18:08:43 +0000110 This worker is run in a separate process.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000111
112 Args:
113 call_queue: A multiprocessing.Queue of _CallItems that will be read and
114 evaluated by the worker.
115 result_queue: A multiprocessing.Queue of _ResultItems that will written
116 to by the worker.
117 shutdown: A multiprocessing.Event that will be set as a signal to the
118 worker that it should exit when call_queue is empty.
119 """
120 while True:
121 try:
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100122 call_item = call_queue.get(block=True)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000123 except queue.Empty:
124 if shutdown.is_set():
125 return
126 else:
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100127 if call_item is None:
128 # Wake up queue management thread
129 result_queue.put(None)
130 return
Brian Quinlan81c4d362010-09-18 22:35:02 +0000131 try:
132 r = call_item.fn(*call_item.args, **call_item.kwargs)
133 except BaseException as e:
134 result_queue.put(_ResultItem(call_item.work_id,
135 exception=e))
136 else:
137 result_queue.put(_ResultItem(call_item.work_id,
138 result=r))
139
140def _add_call_item_to_queue(pending_work_items,
141 work_ids,
142 call_queue):
143 """Fills call_queue with _WorkItems from pending_work_items.
144
145 This function never blocks.
146
147 Args:
148 pending_work_items: A dict mapping work ids to _WorkItems e.g.
149 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
150 work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
151 are consumed and the corresponding _WorkItems from
152 pending_work_items are transformed into _CallItems and put in
153 call_queue.
154 call_queue: A multiprocessing.Queue that will be filled with _CallItems
155 derived from _WorkItems.
156 """
157 while True:
158 if call_queue.full():
159 return
160 try:
161 work_id = work_ids.get(block=False)
162 except queue.Empty:
163 return
164 else:
165 work_item = pending_work_items[work_id]
166
167 if work_item.future.set_running_or_notify_cancel():
168 call_queue.put(_CallItem(work_id,
169 work_item.fn,
170 work_item.args,
171 work_item.kwargs),
172 block=True)
173 else:
174 del pending_work_items[work_id]
175 continue
176
177def _queue_manangement_worker(executor_reference,
178 processes,
179 pending_work_items,
180 work_ids_queue,
181 call_queue,
182 result_queue,
183 shutdown_process_event):
184 """Manages the communication between this process and the worker processes.
185
186 This function is run in a local thread.
187
188 Args:
189 executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
190 this thread. Used to determine if the ProcessPoolExecutor has been
191 garbage collected and that this function can exit.
192 process: A list of the multiprocessing.Process instances used as
193 workers.
194 pending_work_items: A dict mapping work ids to _WorkItems e.g.
195 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
196 work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
197 call_queue: A multiprocessing.Queue that will be filled with _CallItems
198 derived from _WorkItems for processing by the process workers.
199 result_queue: A multiprocessing.Queue of _ResultItems generated by the
200 process workers.
201 shutdown_process_event: A multiprocessing.Event used to signal the
202 process workers that they should exit when their work queue is
203 empty.
204 """
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100205 nb_shutdown_processes = 0
206 def shutdown_one_process():
207 """Tell a worker to terminate, which will in turn wake us again"""
208 nonlocal nb_shutdown_processes
209 call_queue.put(None)
210 nb_shutdown_processes += 1
Brian Quinlan81c4d362010-09-18 22:35:02 +0000211 while True:
212 _add_call_item_to_queue(pending_work_items,
213 work_ids_queue,
214 call_queue)
215
216 try:
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100217 result_item = result_queue.get(block=True)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000218 except queue.Empty:
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100219 pass
Brian Quinlan81c4d362010-09-18 22:35:02 +0000220 else:
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100221 if result_item is not None:
222 work_item = pending_work_items[result_item.work_id]
223 del pending_work_items[result_item.work_id]
Brian Quinlan81c4d362010-09-18 22:35:02 +0000224
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100225 if result_item.exception:
226 work_item.future.set_exception(result_item.exception)
227 else:
228 work_item.future.set_result(result_item.result)
229 continue
230 # If we come here, we either got a timeout or were explicitly woken up.
231 # In either case, check whether we should start shutting down.
232 executor = executor_reference()
233 # No more work items can be added if:
234 # - The interpreter is shutting down OR
235 # - The executor that owns this worker has been collected OR
236 # - The executor that owns this worker has been shutdown.
237 if _shutdown or executor is None or executor._shutdown_thread:
238 # Since no new work items can be added, it is safe to shutdown
239 # this thread if there are no pending work items.
240 if not pending_work_items:
241 shutdown_process_event.set()
242
243 while nb_shutdown_processes < len(processes):
244 shutdown_one_process()
245 # If .join() is not called on the created processes then
246 # some multiprocessing.Queue methods may deadlock on Mac OS
247 # X.
248 for p in processes:
249 p.join()
250 return
Brian Quinlan81c4d362010-09-18 22:35:02 +0000251 else:
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100252 # Start shutting down by telling a process it can exit.
253 shutdown_one_process()
254 del executor
Brian Quinlan81c4d362010-09-18 22:35:02 +0000255
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000256_system_limits_checked = False
257_system_limited = None
258def _check_system_limits():
259 global _system_limits_checked, _system_limited
260 if _system_limits_checked:
261 if _system_limited:
262 raise NotImplementedError(_system_limited)
263 _system_limits_checked = True
264 try:
265 import os
266 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
267 except (AttributeError, ValueError):
268 # sysconf not available or setting not available
269 return
270 if nsems_max == -1:
271 # indetermine limit, assume that limit is determined
272 # by available memory only
273 return
274 if nsems_max >= 256:
275 # minimum number of semaphores available
276 # according to POSIX
277 return
278 _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
279 raise NotImplementedError(_system_limited)
280
Brian Quinlan81c4d362010-09-18 22:35:02 +0000281class ProcessPoolExecutor(_base.Executor):
282 def __init__(self, max_workers=None):
283 """Initializes a new ProcessPoolExecutor instance.
284
285 Args:
286 max_workers: The maximum number of processes that can be used to
287 execute the given calls. If None or not given then as many
288 worker processes will be created as the machine has processors.
289 """
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000290 _check_system_limits()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000291
292 if max_workers is None:
293 self._max_workers = multiprocessing.cpu_count()
294 else:
295 self._max_workers = max_workers
296
297 # Make the call queue slightly larger than the number of processes to
298 # prevent the worker processes from idling. But don't make it too big
299 # because futures in the call queue cannot be cancelled.
300 self._call_queue = multiprocessing.Queue(self._max_workers +
301 EXTRA_QUEUED_CALLS)
302 self._result_queue = multiprocessing.Queue()
303 self._work_ids = queue.Queue()
304 self._queue_management_thread = None
305 self._processes = set()
306
307 # Shutdown is a two-step process.
308 self._shutdown_thread = False
309 self._shutdown_process_event = multiprocessing.Event()
310 self._shutdown_lock = threading.Lock()
311 self._queue_count = 0
312 self._pending_work_items = {}
313
314 def _start_queue_management_thread(self):
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100315 # When the executor gets lost, the weakref callback will wake up
316 # the queue management thread.
317 def weakref_cb(_, q=self._result_queue):
318 q.put(None)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000319 if self._queue_management_thread is None:
320 self._queue_management_thread = threading.Thread(
321 target=_queue_manangement_worker,
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100322 args=(weakref.ref(self, weakref_cb),
Brian Quinlan81c4d362010-09-18 22:35:02 +0000323 self._processes,
324 self._pending_work_items,
325 self._work_ids,
326 self._call_queue,
327 self._result_queue,
328 self._shutdown_process_event))
329 self._queue_management_thread.daemon = True
330 self._queue_management_thread.start()
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100331 _threads_queues[self._queue_management_thread] = self._result_queue
Brian Quinlan81c4d362010-09-18 22:35:02 +0000332
333 def _adjust_process_count(self):
334 for _ in range(len(self._processes), self._max_workers):
335 p = multiprocessing.Process(
336 target=_process_worker,
337 args=(self._call_queue,
338 self._result_queue,
339 self._shutdown_process_event))
340 p.start()
341 self._processes.add(p)
342
343 def submit(self, fn, *args, **kwargs):
344 with self._shutdown_lock:
345 if self._shutdown_thread:
346 raise RuntimeError('cannot schedule new futures after shutdown')
347
348 f = _base.Future()
349 w = _WorkItem(f, fn, args, kwargs)
350
351 self._pending_work_items[self._queue_count] = w
352 self._work_ids.put(self._queue_count)
353 self._queue_count += 1
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100354 # Wake up queue management thread
355 self._result_queue.put(None)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000356
357 self._start_queue_management_thread()
358 self._adjust_process_count()
359 return f
360 submit.__doc__ = _base.Executor.submit.__doc__
361
362 def shutdown(self, wait=True):
363 with self._shutdown_lock:
364 self._shutdown_thread = True
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100365 if self._queue_management_thread:
366 # Wake up queue management thread
367 self._result_queue.put(None)
368 if wait:
Brian Quinlan81c4d362010-09-18 22:35:02 +0000369 self._queue_management_thread.join()
370 # To reduce the risk of openning too many files, remove references to
371 # objects that use file descriptors.
372 self._queue_management_thread = None
373 self._call_queue = None
374 self._result_queue = None
375 self._shutdown_process_event = None
376 self._processes = None
377 shutdown.__doc__ = _base.Executor.shutdown.__doc__
378
379atexit.register(_python_exit)