blob: 7f31ec2263badbdd219d7c0688c6958ce623a49f [file] [log] [blame]
Brian Quinlan81c4d362010-09-18 22:35:02 +00001# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4"""Implements ProcessPoolExecutor.
5
6The follow diagram and text describe the data-flow through the system:
7
8|======================= In-process =====================|== Out-of-process ==|
9
10+----------+ +----------+ +--------+ +-----------+ +---------+
11| | => | Work Ids | => | | => | Call Q | => | |
12| | +----------+ | | +-----------+ | |
13| | | ... | | | | ... | | |
14| | | 6 | | | | 5, call() | | |
15| | | 7 | | | | ... | | |
16| Process | | ... | | Local | +-----------+ | Process |
17| Pool | +----------+ | Worker | | #1..n |
18| Executor | | Thread | | |
19| | +----------- + | | +-----------+ | |
20| | <=> | Work Items | <=> | | <= | Result Q | <= | |
21| | +------------+ | | +-----------+ | |
22| | | 6: call() | | | | ... | | |
23| | | future | | | | 4, result | | |
24| | | ... | | | | 3, except | | |
25+----------+ +------------+ +--------+ +-----------+ +---------+
26
27Executor.submit() called:
28- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29- adds the id of the _WorkItem to the "Work Ids" queue
30
31Local worker thread:
32- reads work ids from the "Work Ids" queue and looks up the corresponding
33 WorkItem from the "Work Items" dict: if the work item has been cancelled then
34 it is simply removed from the dict, otherwise it is repackaged as a
35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38- reads _ResultItems from "Result Q", updates the future stored in the
39 "Work Items" dict and deletes the dict entry
40
41Process #1..n:
42- reads _CallItems from "Call Q", executes the calls, and puts the resulting
43 _ResultItems in "Request Q"
44"""
45
46__author__ = 'Brian Quinlan (brian@sweetapp.com)'
47
48import atexit
Antoine Pitroudd696492011-06-08 17:21:55 +020049import os
Brian Quinlan81c4d362010-09-18 22:35:02 +000050from concurrent.futures import _base
51import queue
52import multiprocessing
Antoine Pitrou020436b2011-07-02 21:20:25 +020053from multiprocessing.queues import SimpleQueue, SentinelReady, Full
Brian Quinlan81c4d362010-09-18 22:35:02 +000054import threading
55import weakref
56
57# Workers are created as daemon threads and processes. This is done to allow the
58# interpreter to exit when there are still idle processes in a
59# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
60# allowing workers to die with the interpreter has two undesirable properties:
61# - The workers would still be running during interpretor shutdown,
62# meaning that they would fail in unpredictable ways.
63# - The workers could be killed while evaluating a work item, which could
64# be bad if the callable being evaluated has external side-effects e.g.
65# writing to a file.
66#
67# To work around this problem, an exit handler is installed which tells the
68# workers to exit when their work queues are empty and then waits until the
69# threads/processes finish.
70
Antoine Pitrouc13d4542011-03-26 19:29:44 +010071_threads_queues = weakref.WeakKeyDictionary()
Brian Quinlan81c4d362010-09-18 22:35:02 +000072_shutdown = False
73
74def _python_exit():
75 global _shutdown
76 _shutdown = True
Antoine Pitrouc13d4542011-03-26 19:29:44 +010077 items = list(_threads_queues.items())
78 for t, q in items:
79 q.put(None)
80 for t, q in items:
81 t.join()
Brian Quinlan81c4d362010-09-18 22:35:02 +000082
83# Controls how many more calls than processes will be queued in the call queue.
84# A smaller number will mean that processes spend more time idle waiting for
85# work while a larger number will make Future.cancel() succeed less frequently
86# (Futures in the call queue cannot be cancelled).
87EXTRA_QUEUED_CALLS = 1
88
89class _WorkItem(object):
90 def __init__(self, future, fn, args, kwargs):
91 self.future = future
92 self.fn = fn
93 self.args = args
94 self.kwargs = kwargs
95
96class _ResultItem(object):
97 def __init__(self, work_id, exception=None, result=None):
98 self.work_id = work_id
99 self.exception = exception
100 self.result = result
101
102class _CallItem(object):
103 def __init__(self, work_id, fn, args, kwargs):
104 self.work_id = work_id
105 self.fn = fn
106 self.args = args
107 self.kwargs = kwargs
108
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200109def _process_worker(call_queue, result_queue):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000110 """Evaluates calls from call_queue and places the results in result_queue.
111
Georg Brandlfb1720b2010-12-09 18:08:43 +0000112 This worker is run in a separate process.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000113
114 Args:
115 call_queue: A multiprocessing.Queue of _CallItems that will be read and
116 evaluated by the worker.
117 result_queue: A multiprocessing.Queue of _ResultItems that will written
118 to by the worker.
119 shutdown: A multiprocessing.Event that will be set as a signal to the
120 worker that it should exit when call_queue is empty.
121 """
122 while True:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200123 call_item = call_queue.get(block=True)
124 if call_item is None:
125 # Wake up queue management thread
Antoine Pitroudd696492011-06-08 17:21:55 +0200126 result_queue.put(os.getpid())
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200127 return
Brian Quinlan81c4d362010-09-18 22:35:02 +0000128 try:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200129 r = call_item.fn(*call_item.args, **call_item.kwargs)
130 except BaseException as e:
131 result_queue.put(_ResultItem(call_item.work_id,
132 exception=e))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000133 else:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200134 result_queue.put(_ResultItem(call_item.work_id,
135 result=r))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000136
137def _add_call_item_to_queue(pending_work_items,
138 work_ids,
139 call_queue):
140 """Fills call_queue with _WorkItems from pending_work_items.
141
142 This function never blocks.
143
144 Args:
145 pending_work_items: A dict mapping work ids to _WorkItems e.g.
146 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
147 work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
148 are consumed and the corresponding _WorkItems from
149 pending_work_items are transformed into _CallItems and put in
150 call_queue.
151 call_queue: A multiprocessing.Queue that will be filled with _CallItems
152 derived from _WorkItems.
153 """
154 while True:
155 if call_queue.full():
156 return
157 try:
158 work_id = work_ids.get(block=False)
159 except queue.Empty:
160 return
161 else:
162 work_item = pending_work_items[work_id]
163
164 if work_item.future.set_running_or_notify_cancel():
165 call_queue.put(_CallItem(work_id,
166 work_item.fn,
167 work_item.args,
168 work_item.kwargs),
169 block=True)
170 else:
171 del pending_work_items[work_id]
172 continue
173
Antoine Pitroub87a56a2011-05-03 16:34:42 +0200174def _queue_management_worker(executor_reference,
175 processes,
176 pending_work_items,
177 work_ids_queue,
178 call_queue,
179 result_queue):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000180 """Manages the communication between this process and the worker processes.
181
182 This function is run in a local thread.
183
184 Args:
185 executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
186 this thread. Used to determine if the ProcessPoolExecutor has been
187 garbage collected and that this function can exit.
188 process: A list of the multiprocessing.Process instances used as
189 workers.
190 pending_work_items: A dict mapping work ids to _WorkItems e.g.
191 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
192 work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
193 call_queue: A multiprocessing.Queue that will be filled with _CallItems
194 derived from _WorkItems for processing by the process workers.
195 result_queue: A multiprocessing.Queue of _ResultItems generated by the
196 process workers.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000197 """
Antoine Pitrou020436b2011-07-02 21:20:25 +0200198 executor = None
199
200 def shutting_down():
201 return _shutdown or executor is None or executor._shutdown_thread
Antoine Pitroudd696492011-06-08 17:21:55 +0200202
203 def shutdown_worker():
204 # This is an upper bound
205 nb_children_alive = sum(p.is_alive() for p in processes.values())
206 for i in range(0, nb_children_alive):
Antoine Pitrou1c405b32011-07-03 13:17:06 +0200207 call_queue.put_nowait(None)
Antoine Pitroudc19c242011-07-16 01:51:58 +0200208 # Release the queue's resources as soon as possible.
209 call_queue.close()
Antoine Pitroudd696492011-06-08 17:21:55 +0200210 # If .join() is not called on the created processes then
Antoine Pitrou020436b2011-07-02 21:20:25 +0200211 # some multiprocessing.Queue methods may deadlock on Mac OS X.
Antoine Pitroudd696492011-06-08 17:21:55 +0200212 for p in processes.values():
213 p.join()
214
Brian Quinlan81c4d362010-09-18 22:35:02 +0000215 while True:
216 _add_call_item_to_queue(pending_work_items,
217 work_ids_queue,
218 call_queue)
219
Antoine Pitroudd696492011-06-08 17:21:55 +0200220 sentinels = [p.sentinel for p in processes.values()]
221 assert sentinels
222 try:
223 result_item = result_queue.get(sentinels=sentinels)
Florent Xicluna04842a82011-11-11 20:05:50 +0100224 except SentinelReady:
Antoine Pitroudd696492011-06-08 17:21:55 +0200225 # Mark the process pool broken so that submits fail right now.
226 executor = executor_reference()
227 if executor is not None:
228 executor._broken = True
229 executor._shutdown_thread = True
Antoine Pitrou020436b2011-07-02 21:20:25 +0200230 executor = None
Antoine Pitroudd696492011-06-08 17:21:55 +0200231 # All futures in flight must be marked failed
232 for work_id, work_item in pending_work_items.items():
233 work_item.future.set_exception(
234 BrokenProcessPool(
235 "A process in the process pool was "
236 "terminated abruptly while the future was "
237 "running or pending."
238 ))
239 pending_work_items.clear()
240 # Terminate remaining workers forcibly: the queues or their
241 # locks may be in a dirty state and block forever.
242 for p in processes.values():
243 p.terminate()
Antoine Pitroudc19c242011-07-16 01:51:58 +0200244 shutdown_worker()
Antoine Pitroudd696492011-06-08 17:21:55 +0200245 return
246 if isinstance(result_item, int):
247 # Clean shutdown of a worker using its PID
248 # (avoids marking the executor broken)
Antoine Pitrou020436b2011-07-02 21:20:25 +0200249 assert shutting_down()
Antoine Pitroud06a0652011-07-16 01:13:34 +0200250 p = processes.pop(result_item)
251 p.join()
Antoine Pitrou020436b2011-07-02 21:20:25 +0200252 if not processes:
253 shutdown_worker()
254 return
Antoine Pitroudd696492011-06-08 17:21:55 +0200255 elif result_item is not None:
256 work_item = pending_work_items.pop(result_item.work_id, None)
257 # work_item can be None if another process terminated (see above)
258 if work_item is not None:
259 if result_item.exception:
260 work_item.future.set_exception(result_item.exception)
261 else:
262 work_item.future.set_result(result_item.result)
263 # Check whether we should start shutting down.
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100264 executor = executor_reference()
265 # No more work items can be added if:
266 # - The interpreter is shutting down OR
267 # - The executor that owns this worker has been collected OR
268 # - The executor that owns this worker has been shutdown.
Antoine Pitrou020436b2011-07-02 21:20:25 +0200269 if shutting_down():
Antoine Pitrou020436b2011-07-02 21:20:25 +0200270 try:
Antoine Pitrou1c405b32011-07-03 13:17:06 +0200271 # Since no new work items can be added, it is safe to shutdown
272 # this thread if there are no pending work items.
273 if not pending_work_items:
274 shutdown_worker()
275 return
Antoine Pitrou020436b2011-07-02 21:20:25 +0200276 except Full:
277 # This is not a problem: we will eventually be woken up (in
Antoine Pitrou1c405b32011-07-03 13:17:06 +0200278 # result_queue.get()) and be able to send a sentinel again.
Antoine Pitrou020436b2011-07-02 21:20:25 +0200279 pass
280 executor = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000281
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000282_system_limits_checked = False
283_system_limited = None
284def _check_system_limits():
285 global _system_limits_checked, _system_limited
286 if _system_limits_checked:
287 if _system_limited:
288 raise NotImplementedError(_system_limited)
289 _system_limits_checked = True
290 try:
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000291 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
292 except (AttributeError, ValueError):
293 # sysconf not available or setting not available
294 return
295 if nsems_max == -1:
296 # indetermine limit, assume that limit is determined
297 # by available memory only
298 return
299 if nsems_max >= 256:
300 # minimum number of semaphores available
301 # according to POSIX
302 return
303 _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
304 raise NotImplementedError(_system_limited)
305
Antoine Pitroudd696492011-06-08 17:21:55 +0200306
307class BrokenProcessPool(RuntimeError):
308 """
309 Raised when a process in a ProcessPoolExecutor terminated abruptly
310 while a future was in the running state.
311 """
312
313
Brian Quinlan81c4d362010-09-18 22:35:02 +0000314class ProcessPoolExecutor(_base.Executor):
315 def __init__(self, max_workers=None):
316 """Initializes a new ProcessPoolExecutor instance.
317
318 Args:
319 max_workers: The maximum number of processes that can be used to
320 execute the given calls. If None or not given then as many
321 worker processes will be created as the machine has processors.
322 """
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000323 _check_system_limits()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000324
325 if max_workers is None:
326 self._max_workers = multiprocessing.cpu_count()
327 else:
328 self._max_workers = max_workers
329
330 # Make the call queue slightly larger than the number of processes to
331 # prevent the worker processes from idling. But don't make it too big
332 # because futures in the call queue cannot be cancelled.
333 self._call_queue = multiprocessing.Queue(self._max_workers +
334 EXTRA_QUEUED_CALLS)
Antoine Pitroudc19c242011-07-16 01:51:58 +0200335 # Killed worker processes can produce spurious "broken pipe"
336 # tracebacks in the queue's own worker thread. But we detect killed
337 # processes anyway, so silence the tracebacks.
338 self._call_queue._ignore_epipe = True
Antoine Pitroub7877f22011-04-12 17:58:11 +0200339 self._result_queue = SimpleQueue()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000340 self._work_ids = queue.Queue()
341 self._queue_management_thread = None
Antoine Pitroudd696492011-06-08 17:21:55 +0200342 # Map of pids to processes
343 self._processes = {}
Brian Quinlan81c4d362010-09-18 22:35:02 +0000344
345 # Shutdown is a two-step process.
346 self._shutdown_thread = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000347 self._shutdown_lock = threading.Lock()
Antoine Pitroudd696492011-06-08 17:21:55 +0200348 self._broken = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000349 self._queue_count = 0
350 self._pending_work_items = {}
351
352 def _start_queue_management_thread(self):
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100353 # When the executor gets lost, the weakref callback will wake up
354 # the queue management thread.
355 def weakref_cb(_, q=self._result_queue):
356 q.put(None)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000357 if self._queue_management_thread is None:
Antoine Pitroudd696492011-06-08 17:21:55 +0200358 # Start the processes so that their sentinels are known.
359 self._adjust_process_count()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000360 self._queue_management_thread = threading.Thread(
Antoine Pitroub87a56a2011-05-03 16:34:42 +0200361 target=_queue_management_worker,
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100362 args=(weakref.ref(self, weakref_cb),
Brian Quinlan81c4d362010-09-18 22:35:02 +0000363 self._processes,
364 self._pending_work_items,
365 self._work_ids,
366 self._call_queue,
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200367 self._result_queue))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000368 self._queue_management_thread.daemon = True
369 self._queue_management_thread.start()
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100370 _threads_queues[self._queue_management_thread] = self._result_queue
Brian Quinlan81c4d362010-09-18 22:35:02 +0000371
372 def _adjust_process_count(self):
373 for _ in range(len(self._processes), self._max_workers):
374 p = multiprocessing.Process(
375 target=_process_worker,
376 args=(self._call_queue,
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200377 self._result_queue))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000378 p.start()
Antoine Pitroudd696492011-06-08 17:21:55 +0200379 self._processes[p.pid] = p
Brian Quinlan81c4d362010-09-18 22:35:02 +0000380
381 def submit(self, fn, *args, **kwargs):
382 with self._shutdown_lock:
Antoine Pitroudd696492011-06-08 17:21:55 +0200383 if self._broken:
384 raise BrokenProcessPool('A child process terminated '
385 'abruptly, the process pool is not usable anymore')
Brian Quinlan81c4d362010-09-18 22:35:02 +0000386 if self._shutdown_thread:
387 raise RuntimeError('cannot schedule new futures after shutdown')
388
389 f = _base.Future()
390 w = _WorkItem(f, fn, args, kwargs)
391
392 self._pending_work_items[self._queue_count] = w
393 self._work_ids.put(self._queue_count)
394 self._queue_count += 1
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100395 # Wake up queue management thread
396 self._result_queue.put(None)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000397
398 self._start_queue_management_thread()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000399 return f
400 submit.__doc__ = _base.Executor.submit.__doc__
401
402 def shutdown(self, wait=True):
403 with self._shutdown_lock:
404 self._shutdown_thread = True
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100405 if self._queue_management_thread:
406 # Wake up queue management thread
407 self._result_queue.put(None)
408 if wait:
Brian Quinlan81c4d362010-09-18 22:35:02 +0000409 self._queue_management_thread.join()
410 # To reduce the risk of openning too many files, remove references to
411 # objects that use file descriptors.
412 self._queue_management_thread = None
413 self._call_queue = None
414 self._result_queue = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000415 self._processes = None
416 shutdown.__doc__ = _base.Executor.shutdown.__doc__
417
418atexit.register(_python_exit)