blob: f0bf6d5817dff2cf57ed3bbac6a25029aa0128d5 [file] [log] [blame]
Brian Quinlan81c4d362010-09-18 22:35:02 +00001# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4"""Implements ProcessPoolExecutor.
5
6The follow diagram and text describe the data-flow through the system:
7
8|======================= In-process =====================|== Out-of-process ==|
9
10+----------+ +----------+ +--------+ +-----------+ +---------+
11| | => | Work Ids | => | | => | Call Q | => | |
12| | +----------+ | | +-----------+ | |
13| | | ... | | | | ... | | |
14| | | 6 | | | | 5, call() | | |
15| | | 7 | | | | ... | | |
16| Process | | ... | | Local | +-----------+ | Process |
17| Pool | +----------+ | Worker | | #1..n |
18| Executor | | Thread | | |
19| | +----------- + | | +-----------+ | |
20| | <=> | Work Items | <=> | | <= | Result Q | <= | |
21| | +------------+ | | +-----------+ | |
22| | | 6: call() | | | | ... | | |
23| | | future | | | | 4, result | | |
24| | | ... | | | | 3, except | | |
25+----------+ +------------+ +--------+ +-----------+ +---------+
26
27Executor.submit() called:
28- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29- adds the id of the _WorkItem to the "Work Ids" queue
30
31Local worker thread:
32- reads work ids from the "Work Ids" queue and looks up the corresponding
33 WorkItem from the "Work Items" dict: if the work item has been cancelled then
34 it is simply removed from the dict, otherwise it is repackaged as a
35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38- reads _ResultItems from "Result Q", updates the future stored in the
39 "Work Items" dict and deletes the dict entry
40
41Process #1..n:
42- reads _CallItems from "Call Q", executes the calls, and puts the resulting
43 _ResultItems in "Request Q"
44"""
45
46__author__ = 'Brian Quinlan (brian@sweetapp.com)'
47
48import atexit
49from concurrent.futures import _base
50import queue
51import multiprocessing
Antoine Pitroub7877f22011-04-12 17:58:11 +020052from multiprocessing.queues import SimpleQueue
Brian Quinlan81c4d362010-09-18 22:35:02 +000053import threading
54import weakref
55
56# Workers are created as daemon threads and processes. This is done to allow the
57# interpreter to exit when there are still idle processes in a
58# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
59# allowing workers to die with the interpreter has two undesirable properties:
60# - The workers would still be running during interpretor shutdown,
61# meaning that they would fail in unpredictable ways.
62# - The workers could be killed while evaluating a work item, which could
63# be bad if the callable being evaluated has external side-effects e.g.
64# writing to a file.
65#
66# To work around this problem, an exit handler is installed which tells the
67# workers to exit when their work queues are empty and then waits until the
68# threads/processes finish.
69
Antoine Pitrouc13d4542011-03-26 19:29:44 +010070_threads_queues = weakref.WeakKeyDictionary()
Brian Quinlan81c4d362010-09-18 22:35:02 +000071_shutdown = False
72
73def _python_exit():
74 global _shutdown
75 _shutdown = True
Antoine Pitrouc13d4542011-03-26 19:29:44 +010076 items = list(_threads_queues.items())
77 for t, q in items:
78 q.put(None)
79 for t, q in items:
80 t.join()
Brian Quinlan81c4d362010-09-18 22:35:02 +000081
82# Controls how many more calls than processes will be queued in the call queue.
83# A smaller number will mean that processes spend more time idle waiting for
84# work while a larger number will make Future.cancel() succeed less frequently
85# (Futures in the call queue cannot be cancelled).
86EXTRA_QUEUED_CALLS = 1
87
88class _WorkItem(object):
89 def __init__(self, future, fn, args, kwargs):
90 self.future = future
91 self.fn = fn
92 self.args = args
93 self.kwargs = kwargs
94
95class _ResultItem(object):
96 def __init__(self, work_id, exception=None, result=None):
97 self.work_id = work_id
98 self.exception = exception
99 self.result = result
100
101class _CallItem(object):
102 def __init__(self, work_id, fn, args, kwargs):
103 self.work_id = work_id
104 self.fn = fn
105 self.args = args
106 self.kwargs = kwargs
107
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200108def _process_worker(call_queue, result_queue):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000109 """Evaluates calls from call_queue and places the results in result_queue.
110
Georg Brandlfb1720b2010-12-09 18:08:43 +0000111 This worker is run in a separate process.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000112
113 Args:
114 call_queue: A multiprocessing.Queue of _CallItems that will be read and
115 evaluated by the worker.
116 result_queue: A multiprocessing.Queue of _ResultItems that will written
117 to by the worker.
118 shutdown: A multiprocessing.Event that will be set as a signal to the
119 worker that it should exit when call_queue is empty.
120 """
121 while True:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200122 call_item = call_queue.get(block=True)
123 if call_item is None:
124 # Wake up queue management thread
125 result_queue.put(None)
126 return
Brian Quinlan81c4d362010-09-18 22:35:02 +0000127 try:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200128 r = call_item.fn(*call_item.args, **call_item.kwargs)
129 except BaseException as e:
130 result_queue.put(_ResultItem(call_item.work_id,
131 exception=e))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000132 else:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200133 result_queue.put(_ResultItem(call_item.work_id,
134 result=r))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000135
136def _add_call_item_to_queue(pending_work_items,
137 work_ids,
138 call_queue):
139 """Fills call_queue with _WorkItems from pending_work_items.
140
141 This function never blocks.
142
143 Args:
144 pending_work_items: A dict mapping work ids to _WorkItems e.g.
145 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
146 work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
147 are consumed and the corresponding _WorkItems from
148 pending_work_items are transformed into _CallItems and put in
149 call_queue.
150 call_queue: A multiprocessing.Queue that will be filled with _CallItems
151 derived from _WorkItems.
152 """
153 while True:
154 if call_queue.full():
155 return
156 try:
157 work_id = work_ids.get(block=False)
158 except queue.Empty:
159 return
160 else:
161 work_item = pending_work_items[work_id]
162
163 if work_item.future.set_running_or_notify_cancel():
164 call_queue.put(_CallItem(work_id,
165 work_item.fn,
166 work_item.args,
167 work_item.kwargs),
168 block=True)
169 else:
170 del pending_work_items[work_id]
171 continue
172
Antoine Pitroub87a56a2011-05-03 16:34:42 +0200173def _queue_management_worker(executor_reference,
174 processes,
175 pending_work_items,
176 work_ids_queue,
177 call_queue,
178 result_queue):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000179 """Manages the communication between this process and the worker processes.
180
181 This function is run in a local thread.
182
183 Args:
184 executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
185 this thread. Used to determine if the ProcessPoolExecutor has been
186 garbage collected and that this function can exit.
187 process: A list of the multiprocessing.Process instances used as
188 workers.
189 pending_work_items: A dict mapping work ids to _WorkItems e.g.
190 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
191 work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
192 call_queue: A multiprocessing.Queue that will be filled with _CallItems
193 derived from _WorkItems for processing by the process workers.
194 result_queue: A multiprocessing.Queue of _ResultItems generated by the
195 process workers.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000196 """
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100197 nb_shutdown_processes = 0
198 def shutdown_one_process():
199 """Tell a worker to terminate, which will in turn wake us again"""
200 nonlocal nb_shutdown_processes
201 call_queue.put(None)
202 nb_shutdown_processes += 1
Brian Quinlan81c4d362010-09-18 22:35:02 +0000203 while True:
204 _add_call_item_to_queue(pending_work_items,
205 work_ids_queue,
206 call_queue)
207
Antoine Pitroub7877f22011-04-12 17:58:11 +0200208 result_item = result_queue.get()
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200209 if result_item is not None:
210 work_item = pending_work_items[result_item.work_id]
211 del pending_work_items[result_item.work_id]
Brian Quinlan81c4d362010-09-18 22:35:02 +0000212
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200213 if result_item.exception:
214 work_item.future.set_exception(result_item.exception)
215 else:
216 work_item.future.set_result(result_item.result)
217 continue
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100218 # If we come here, we either got a timeout or were explicitly woken up.
219 # In either case, check whether we should start shutting down.
220 executor = executor_reference()
221 # No more work items can be added if:
222 # - The interpreter is shutting down OR
223 # - The executor that owns this worker has been collected OR
224 # - The executor that owns this worker has been shutdown.
225 if _shutdown or executor is None or executor._shutdown_thread:
226 # Since no new work items can be added, it is safe to shutdown
227 # this thread if there are no pending work items.
228 if not pending_work_items:
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100229 while nb_shutdown_processes < len(processes):
230 shutdown_one_process()
231 # If .join() is not called on the created processes then
232 # some multiprocessing.Queue methods may deadlock on Mac OS
233 # X.
234 for p in processes:
235 p.join()
236 return
Brian Quinlan81c4d362010-09-18 22:35:02 +0000237 else:
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100238 # Start shutting down by telling a process it can exit.
239 shutdown_one_process()
240 del executor
Brian Quinlan81c4d362010-09-18 22:35:02 +0000241
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000242_system_limits_checked = False
243_system_limited = None
244def _check_system_limits():
245 global _system_limits_checked, _system_limited
246 if _system_limits_checked:
247 if _system_limited:
248 raise NotImplementedError(_system_limited)
249 _system_limits_checked = True
250 try:
251 import os
252 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
253 except (AttributeError, ValueError):
254 # sysconf not available or setting not available
255 return
256 if nsems_max == -1:
257 # indetermine limit, assume that limit is determined
258 # by available memory only
259 return
260 if nsems_max >= 256:
261 # minimum number of semaphores available
262 # according to POSIX
263 return
264 _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
265 raise NotImplementedError(_system_limited)
266
Brian Quinlan81c4d362010-09-18 22:35:02 +0000267class ProcessPoolExecutor(_base.Executor):
268 def __init__(self, max_workers=None):
269 """Initializes a new ProcessPoolExecutor instance.
270
271 Args:
272 max_workers: The maximum number of processes that can be used to
273 execute the given calls. If None or not given then as many
274 worker processes will be created as the machine has processors.
275 """
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000276 _check_system_limits()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000277
278 if max_workers is None:
279 self._max_workers = multiprocessing.cpu_count()
280 else:
281 self._max_workers = max_workers
282
283 # Make the call queue slightly larger than the number of processes to
284 # prevent the worker processes from idling. But don't make it too big
285 # because futures in the call queue cannot be cancelled.
286 self._call_queue = multiprocessing.Queue(self._max_workers +
287 EXTRA_QUEUED_CALLS)
Antoine Pitroub7877f22011-04-12 17:58:11 +0200288 self._result_queue = SimpleQueue()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000289 self._work_ids = queue.Queue()
290 self._queue_management_thread = None
291 self._processes = set()
292
293 # Shutdown is a two-step process.
294 self._shutdown_thread = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000295 self._shutdown_lock = threading.Lock()
296 self._queue_count = 0
297 self._pending_work_items = {}
298
299 def _start_queue_management_thread(self):
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100300 # When the executor gets lost, the weakref callback will wake up
301 # the queue management thread.
302 def weakref_cb(_, q=self._result_queue):
303 q.put(None)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000304 if self._queue_management_thread is None:
305 self._queue_management_thread = threading.Thread(
Antoine Pitroub87a56a2011-05-03 16:34:42 +0200306 target=_queue_management_worker,
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100307 args=(weakref.ref(self, weakref_cb),
Brian Quinlan81c4d362010-09-18 22:35:02 +0000308 self._processes,
309 self._pending_work_items,
310 self._work_ids,
311 self._call_queue,
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200312 self._result_queue))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000313 self._queue_management_thread.daemon = True
314 self._queue_management_thread.start()
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100315 _threads_queues[self._queue_management_thread] = self._result_queue
Brian Quinlan81c4d362010-09-18 22:35:02 +0000316
317 def _adjust_process_count(self):
318 for _ in range(len(self._processes), self._max_workers):
319 p = multiprocessing.Process(
320 target=_process_worker,
321 args=(self._call_queue,
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200322 self._result_queue))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000323 p.start()
324 self._processes.add(p)
325
326 def submit(self, fn, *args, **kwargs):
327 with self._shutdown_lock:
328 if self._shutdown_thread:
329 raise RuntimeError('cannot schedule new futures after shutdown')
330
331 f = _base.Future()
332 w = _WorkItem(f, fn, args, kwargs)
333
334 self._pending_work_items[self._queue_count] = w
335 self._work_ids.put(self._queue_count)
336 self._queue_count += 1
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100337 # Wake up queue management thread
338 self._result_queue.put(None)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000339
340 self._start_queue_management_thread()
341 self._adjust_process_count()
342 return f
343 submit.__doc__ = _base.Executor.submit.__doc__
344
345 def shutdown(self, wait=True):
346 with self._shutdown_lock:
347 self._shutdown_thread = True
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100348 if self._queue_management_thread:
349 # Wake up queue management thread
350 self._result_queue.put(None)
351 if wait:
Brian Quinlan81c4d362010-09-18 22:35:02 +0000352 self._queue_management_thread.join()
353 # To reduce the risk of openning too many files, remove references to
354 # objects that use file descriptors.
355 self._queue_management_thread = None
356 self._call_queue = None
357 self._result_queue = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000358 self._processes = None
359 shutdown.__doc__ = _base.Executor.shutdown.__doc__
360
361atexit.register(_python_exit)