blob: 50ee296ac89b7841663a92dfd82100f37126d690 [file] [log] [blame]
Brian Quinlan81c4d362010-09-18 22:35:02 +00001# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4"""Implements ProcessPoolExecutor.
5
6The follow diagram and text describe the data-flow through the system:
7
8|======================= In-process =====================|== Out-of-process ==|
9
10+----------+ +----------+ +--------+ +-----------+ +---------+
11| | => | Work Ids | => | | => | Call Q | => | |
12| | +----------+ | | +-----------+ | |
13| | | ... | | | | ... | | |
14| | | 6 | | | | 5, call() | | |
15| | | 7 | | | | ... | | |
16| Process | | ... | | Local | +-----------+ | Process |
17| Pool | +----------+ | Worker | | #1..n |
18| Executor | | Thread | | |
19| | +----------- + | | +-----------+ | |
20| | <=> | Work Items | <=> | | <= | Result Q | <= | |
21| | +------------+ | | +-----------+ | |
22| | | 6: call() | | | | ... | | |
23| | | future | | | | 4, result | | |
24| | | ... | | | | 3, except | | |
25+----------+ +------------+ +--------+ +-----------+ +---------+
26
27Executor.submit() called:
28- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29- adds the id of the _WorkItem to the "Work Ids" queue
30
31Local worker thread:
32- reads work ids from the "Work Ids" queue and looks up the corresponding
33 WorkItem from the "Work Items" dict: if the work item has been cancelled then
34 it is simply removed from the dict, otherwise it is repackaged as a
35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38- reads _ResultItems from "Result Q", updates the future stored in the
39 "Work Items" dict and deletes the dict entry
40
41Process #1..n:
42- reads _CallItems from "Call Q", executes the calls, and puts the resulting
Mark Dickinson5ee24042012-10-20 13:16:49 +010043 _ResultItems in "Result Q"
Brian Quinlan81c4d362010-09-18 22:35:02 +000044"""
45
46__author__ = 'Brian Quinlan (brian@sweetapp.com)'
47
48import atexit
Antoine Pitroudd696492011-06-08 17:21:55 +020049import os
Brian Quinlan81c4d362010-09-18 22:35:02 +000050from concurrent.futures import _base
51import queue
Richard Oudkerk1f2eaa92013-10-16 17:06:22 +010052from queue import Full
Brian Quinlan81c4d362010-09-18 22:35:02 +000053import multiprocessing
Richard Oudkerk1f2eaa92013-10-16 17:06:22 +010054from multiprocessing import SimpleQueue
Antoine Pitroubdb1cf12012-03-05 19:28:37 +010055from multiprocessing.connection import wait
Brian Quinlan81c4d362010-09-18 22:35:02 +000056import threading
57import weakref
Antoine Pitrou4aae2762014-10-04 20:20:10 +020058from functools import partial
59import itertools
Antoine Pitrou1285c9b2015-01-17 20:02:14 +010060import traceback
Brian Quinlan81c4d362010-09-18 22:35:02 +000061
62# Workers are created as daemon threads and processes. This is done to allow the
63# interpreter to exit when there are still idle processes in a
64# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
65# allowing workers to die with the interpreter has two undesirable properties:
Raymond Hettinger15f44ab2016-08-30 10:47:49 -070066# - The workers would still be running during interpreter shutdown,
Brian Quinlan81c4d362010-09-18 22:35:02 +000067# meaning that they would fail in unpredictable ways.
68# - The workers could be killed while evaluating a work item, which could
69# be bad if the callable being evaluated has external side-effects e.g.
70# writing to a file.
71#
72# To work around this problem, an exit handler is installed which tells the
73# workers to exit when their work queues are empty and then waits until the
74# threads/processes finish.
75
Antoine Pitrouc13d4542011-03-26 19:29:44 +010076_threads_queues = weakref.WeakKeyDictionary()
Brian Quinlan81c4d362010-09-18 22:35:02 +000077_shutdown = False
78
79def _python_exit():
80 global _shutdown
81 _shutdown = True
Antoine Pitrouc13d4542011-03-26 19:29:44 +010082 items = list(_threads_queues.items())
83 for t, q in items:
84 q.put(None)
85 for t, q in items:
86 t.join()
Brian Quinlan81c4d362010-09-18 22:35:02 +000087
88# Controls how many more calls than processes will be queued in the call queue.
89# A smaller number will mean that processes spend more time idle waiting for
90# work while a larger number will make Future.cancel() succeed less frequently
91# (Futures in the call queue cannot be cancelled).
92EXTRA_QUEUED_CALLS = 1
93
Antoine Pitrou1285c9b2015-01-17 20:02:14 +010094# Hack to embed stringification of remote traceback in local traceback
95
96class _RemoteTraceback(Exception):
97 def __init__(self, tb):
98 self.tb = tb
99 def __str__(self):
100 return self.tb
101
102class _ExceptionWithTraceback:
103 def __init__(self, exc, tb):
104 tb = traceback.format_exception(type(exc), exc, tb)
105 tb = ''.join(tb)
106 self.exc = exc
107 self.tb = '\n"""\n%s"""' % tb
108 def __reduce__(self):
109 return _rebuild_exc, (self.exc, self.tb)
110
111def _rebuild_exc(exc, tb):
112 exc.__cause__ = _RemoteTraceback(tb)
113 return exc
114
Brian Quinlan81c4d362010-09-18 22:35:02 +0000115class _WorkItem(object):
116 def __init__(self, future, fn, args, kwargs):
117 self.future = future
118 self.fn = fn
119 self.args = args
120 self.kwargs = kwargs
121
122class _ResultItem(object):
123 def __init__(self, work_id, exception=None, result=None):
124 self.work_id = work_id
125 self.exception = exception
126 self.result = result
127
128class _CallItem(object):
129 def __init__(self, work_id, fn, args, kwargs):
130 self.work_id = work_id
131 self.fn = fn
132 self.args = args
133 self.kwargs = kwargs
134
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200135def _get_chunks(*iterables, chunksize):
136 """ Iterates over zip()ed iterables in chunks. """
137 it = zip(*iterables)
138 while True:
139 chunk = tuple(itertools.islice(it, chunksize))
140 if not chunk:
141 return
142 yield chunk
143
144def _process_chunk(fn, chunk):
145 """ Processes a chunk of an iterable passed to map.
146
147 Runs the function passed to map() on a chunk of the
148 iterable passed to map.
149
150 This function is run in a separate process.
151
152 """
153 return [fn(*args) for args in chunk]
154
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200155def _process_worker(call_queue, result_queue):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000156 """Evaluates calls from call_queue and places the results in result_queue.
157
Georg Brandlfb1720b2010-12-09 18:08:43 +0000158 This worker is run in a separate process.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000159
160 Args:
161 call_queue: A multiprocessing.Queue of _CallItems that will be read and
162 evaluated by the worker.
163 result_queue: A multiprocessing.Queue of _ResultItems that will written
164 to by the worker.
165 shutdown: A multiprocessing.Event that will be set as a signal to the
166 worker that it should exit when call_queue is empty.
167 """
168 while True:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200169 call_item = call_queue.get(block=True)
170 if call_item is None:
171 # Wake up queue management thread
Antoine Pitroudd696492011-06-08 17:21:55 +0200172 result_queue.put(os.getpid())
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200173 return
Brian Quinlan81c4d362010-09-18 22:35:02 +0000174 try:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200175 r = call_item.fn(*call_item.args, **call_item.kwargs)
176 except BaseException as e:
Antoine Pitrou1285c9b2015-01-17 20:02:14 +0100177 exc = _ExceptionWithTraceback(e, e.__traceback__)
178 result_queue.put(_ResultItem(call_item.work_id, exception=exc))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000179 else:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200180 result_queue.put(_ResultItem(call_item.work_id,
181 result=r))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000182
183def _add_call_item_to_queue(pending_work_items,
184 work_ids,
185 call_queue):
186 """Fills call_queue with _WorkItems from pending_work_items.
187
188 This function never blocks.
189
190 Args:
191 pending_work_items: A dict mapping work ids to _WorkItems e.g.
192 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
193 work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
194 are consumed and the corresponding _WorkItems from
195 pending_work_items are transformed into _CallItems and put in
196 call_queue.
197 call_queue: A multiprocessing.Queue that will be filled with _CallItems
198 derived from _WorkItems.
199 """
200 while True:
201 if call_queue.full():
202 return
203 try:
204 work_id = work_ids.get(block=False)
205 except queue.Empty:
206 return
207 else:
208 work_item = pending_work_items[work_id]
209
210 if work_item.future.set_running_or_notify_cancel():
211 call_queue.put(_CallItem(work_id,
212 work_item.fn,
213 work_item.args,
214 work_item.kwargs),
215 block=True)
216 else:
217 del pending_work_items[work_id]
218 continue
219
Antoine Pitroub87a56a2011-05-03 16:34:42 +0200220def _queue_management_worker(executor_reference,
221 processes,
222 pending_work_items,
223 work_ids_queue,
224 call_queue,
225 result_queue):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000226 """Manages the communication between this process and the worker processes.
227
228 This function is run in a local thread.
229
230 Args:
231 executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
232 this thread. Used to determine if the ProcessPoolExecutor has been
233 garbage collected and that this function can exit.
234 process: A list of the multiprocessing.Process instances used as
235 workers.
236 pending_work_items: A dict mapping work ids to _WorkItems e.g.
237 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
238 work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
239 call_queue: A multiprocessing.Queue that will be filled with _CallItems
240 derived from _WorkItems for processing by the process workers.
241 result_queue: A multiprocessing.Queue of _ResultItems generated by the
242 process workers.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000243 """
Antoine Pitrou020436b2011-07-02 21:20:25 +0200244 executor = None
245
246 def shutting_down():
247 return _shutdown or executor is None or executor._shutdown_thread
Antoine Pitroudd696492011-06-08 17:21:55 +0200248
249 def shutdown_worker():
250 # This is an upper bound
251 nb_children_alive = sum(p.is_alive() for p in processes.values())
252 for i in range(0, nb_children_alive):
Antoine Pitrou1c405b32011-07-03 13:17:06 +0200253 call_queue.put_nowait(None)
Antoine Pitroudc19c242011-07-16 01:51:58 +0200254 # Release the queue's resources as soon as possible.
255 call_queue.close()
Antoine Pitroudd696492011-06-08 17:21:55 +0200256 # If .join() is not called on the created processes then
Antoine Pitrou020436b2011-07-02 21:20:25 +0200257 # some multiprocessing.Queue methods may deadlock on Mac OS X.
Antoine Pitroudd696492011-06-08 17:21:55 +0200258 for p in processes.values():
259 p.join()
260
Antoine Pitroubdb1cf12012-03-05 19:28:37 +0100261 reader = result_queue._reader
262
Brian Quinlan81c4d362010-09-18 22:35:02 +0000263 while True:
264 _add_call_item_to_queue(pending_work_items,
265 work_ids_queue,
266 call_queue)
267
Antoine Pitroudd696492011-06-08 17:21:55 +0200268 sentinels = [p.sentinel for p in processes.values()]
269 assert sentinels
Antoine Pitroubdb1cf12012-03-05 19:28:37 +0100270 ready = wait([reader] + sentinels)
271 if reader in ready:
272 result_item = reader.recv()
273 else:
Antoine Pitroudd696492011-06-08 17:21:55 +0200274 # Mark the process pool broken so that submits fail right now.
275 executor = executor_reference()
276 if executor is not None:
277 executor._broken = True
278 executor._shutdown_thread = True
Antoine Pitrou020436b2011-07-02 21:20:25 +0200279 executor = None
Antoine Pitroudd696492011-06-08 17:21:55 +0200280 # All futures in flight must be marked failed
281 for work_id, work_item in pending_work_items.items():
282 work_item.future.set_exception(
283 BrokenProcessPool(
284 "A process in the process pool was "
285 "terminated abruptly while the future was "
286 "running or pending."
287 ))
Andrew Svetlov6b973742012-11-03 15:36:01 +0200288 # Delete references to object. See issue16284
289 del work_item
Antoine Pitroudd696492011-06-08 17:21:55 +0200290 pending_work_items.clear()
291 # Terminate remaining workers forcibly: the queues or their
292 # locks may be in a dirty state and block forever.
293 for p in processes.values():
294 p.terminate()
Antoine Pitroudc19c242011-07-16 01:51:58 +0200295 shutdown_worker()
Antoine Pitroudd696492011-06-08 17:21:55 +0200296 return
297 if isinstance(result_item, int):
298 # Clean shutdown of a worker using its PID
299 # (avoids marking the executor broken)
Antoine Pitrou020436b2011-07-02 21:20:25 +0200300 assert shutting_down()
Antoine Pitroud06a0652011-07-16 01:13:34 +0200301 p = processes.pop(result_item)
302 p.join()
Antoine Pitrou020436b2011-07-02 21:20:25 +0200303 if not processes:
304 shutdown_worker()
305 return
Antoine Pitroudd696492011-06-08 17:21:55 +0200306 elif result_item is not None:
307 work_item = pending_work_items.pop(result_item.work_id, None)
308 # work_item can be None if another process terminated (see above)
309 if work_item is not None:
310 if result_item.exception:
311 work_item.future.set_exception(result_item.exception)
312 else:
313 work_item.future.set_result(result_item.result)
Andrew Svetlov6b973742012-11-03 15:36:01 +0200314 # Delete references to object. See issue16284
315 del work_item
Antoine Pitroudd696492011-06-08 17:21:55 +0200316 # Check whether we should start shutting down.
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100317 executor = executor_reference()
318 # No more work items can be added if:
319 # - The interpreter is shutting down OR
320 # - The executor that owns this worker has been collected OR
321 # - The executor that owns this worker has been shutdown.
Antoine Pitrou020436b2011-07-02 21:20:25 +0200322 if shutting_down():
Antoine Pitrou020436b2011-07-02 21:20:25 +0200323 try:
Antoine Pitrou1c405b32011-07-03 13:17:06 +0200324 # Since no new work items can be added, it is safe to shutdown
325 # this thread if there are no pending work items.
326 if not pending_work_items:
327 shutdown_worker()
328 return
Antoine Pitrou020436b2011-07-02 21:20:25 +0200329 except Full:
330 # This is not a problem: we will eventually be woken up (in
Antoine Pitrou1c405b32011-07-03 13:17:06 +0200331 # result_queue.get()) and be able to send a sentinel again.
Antoine Pitrou020436b2011-07-02 21:20:25 +0200332 pass
333 executor = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000334
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000335_system_limits_checked = False
336_system_limited = None
337def _check_system_limits():
338 global _system_limits_checked, _system_limited
339 if _system_limits_checked:
340 if _system_limited:
341 raise NotImplementedError(_system_limited)
342 _system_limits_checked = True
343 try:
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000344 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
345 except (AttributeError, ValueError):
346 # sysconf not available or setting not available
347 return
348 if nsems_max == -1:
Ezio Melottib5bc3532013-08-17 16:11:40 +0300349 # indetermined limit, assume that limit is determined
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000350 # by available memory only
351 return
352 if nsems_max >= 256:
353 # minimum number of semaphores available
354 # according to POSIX
355 return
356 _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
357 raise NotImplementedError(_system_limited)
358
Antoine Pitroudd696492011-06-08 17:21:55 +0200359
Grzegorz Grzywacz97e1b1c2017-09-01 18:54:00 +0200360def _chain_from_iterable_of_lists(iterable):
361 """
362 Specialized implementation of itertools.chain.from_iterable.
363 Each item in *iterable* should be a list. This function is
364 careful not to keep references to yielded objects.
365 """
366 for element in iterable:
367 element.reverse()
368 while element:
369 yield element.pop()
370
371
Antoine Pitroudd696492011-06-08 17:21:55 +0200372class BrokenProcessPool(RuntimeError):
373 """
374 Raised when a process in a ProcessPoolExecutor terminated abruptly
375 while a future was in the running state.
376 """
377
378
Brian Quinlan81c4d362010-09-18 22:35:02 +0000379class ProcessPoolExecutor(_base.Executor):
380 def __init__(self, max_workers=None):
381 """Initializes a new ProcessPoolExecutor instance.
382
383 Args:
384 max_workers: The maximum number of processes that can be used to
385 execute the given calls. If None or not given then as many
386 worker processes will be created as the machine has processors.
387 """
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000388 _check_system_limits()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000389
390 if max_workers is None:
Charles-François Natali37cfb0a2013-06-28 19:25:45 +0200391 self._max_workers = os.cpu_count() or 1
Brian Quinlan81c4d362010-09-18 22:35:02 +0000392 else:
Brian Quinlan20efceb2014-05-17 13:51:10 -0700393 if max_workers <= 0:
394 raise ValueError("max_workers must be greater than 0")
395
Brian Quinlan81c4d362010-09-18 22:35:02 +0000396 self._max_workers = max_workers
397
398 # Make the call queue slightly larger than the number of processes to
399 # prevent the worker processes from idling. But don't make it too big
400 # because futures in the call queue cannot be cancelled.
401 self._call_queue = multiprocessing.Queue(self._max_workers +
402 EXTRA_QUEUED_CALLS)
Antoine Pitroudc19c242011-07-16 01:51:58 +0200403 # Killed worker processes can produce spurious "broken pipe"
404 # tracebacks in the queue's own worker thread. But we detect killed
405 # processes anyway, so silence the tracebacks.
406 self._call_queue._ignore_epipe = True
Antoine Pitroub7877f22011-04-12 17:58:11 +0200407 self._result_queue = SimpleQueue()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000408 self._work_ids = queue.Queue()
409 self._queue_management_thread = None
Antoine Pitroudd696492011-06-08 17:21:55 +0200410 # Map of pids to processes
411 self._processes = {}
Brian Quinlan81c4d362010-09-18 22:35:02 +0000412
413 # Shutdown is a two-step process.
414 self._shutdown_thread = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000415 self._shutdown_lock = threading.Lock()
Antoine Pitroudd696492011-06-08 17:21:55 +0200416 self._broken = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000417 self._queue_count = 0
418 self._pending_work_items = {}
419
420 def _start_queue_management_thread(self):
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100421 # When the executor gets lost, the weakref callback will wake up
422 # the queue management thread.
423 def weakref_cb(_, q=self._result_queue):
424 q.put(None)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000425 if self._queue_management_thread is None:
Antoine Pitroudd696492011-06-08 17:21:55 +0200426 # Start the processes so that their sentinels are known.
427 self._adjust_process_count()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000428 self._queue_management_thread = threading.Thread(
Antoine Pitroub87a56a2011-05-03 16:34:42 +0200429 target=_queue_management_worker,
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100430 args=(weakref.ref(self, weakref_cb),
Brian Quinlan81c4d362010-09-18 22:35:02 +0000431 self._processes,
432 self._pending_work_items,
433 self._work_ids,
434 self._call_queue,
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200435 self._result_queue))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000436 self._queue_management_thread.daemon = True
437 self._queue_management_thread.start()
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100438 _threads_queues[self._queue_management_thread] = self._result_queue
Brian Quinlan81c4d362010-09-18 22:35:02 +0000439
440 def _adjust_process_count(self):
441 for _ in range(len(self._processes), self._max_workers):
442 p = multiprocessing.Process(
443 target=_process_worker,
444 args=(self._call_queue,
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200445 self._result_queue))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000446 p.start()
Antoine Pitroudd696492011-06-08 17:21:55 +0200447 self._processes[p.pid] = p
Brian Quinlan81c4d362010-09-18 22:35:02 +0000448
449 def submit(self, fn, *args, **kwargs):
450 with self._shutdown_lock:
Antoine Pitroudd696492011-06-08 17:21:55 +0200451 if self._broken:
452 raise BrokenProcessPool('A child process terminated '
453 'abruptly, the process pool is not usable anymore')
Brian Quinlan81c4d362010-09-18 22:35:02 +0000454 if self._shutdown_thread:
455 raise RuntimeError('cannot schedule new futures after shutdown')
456
457 f = _base.Future()
458 w = _WorkItem(f, fn, args, kwargs)
459
460 self._pending_work_items[self._queue_count] = w
461 self._work_ids.put(self._queue_count)
462 self._queue_count += 1
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100463 # Wake up queue management thread
464 self._result_queue.put(None)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000465
466 self._start_queue_management_thread()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000467 return f
468 submit.__doc__ = _base.Executor.submit.__doc__
469
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200470 def map(self, fn, *iterables, timeout=None, chunksize=1):
Martin Panterd2ad5712015-11-02 04:20:33 +0000471 """Returns an iterator equivalent to map(fn, iter).
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200472
473 Args:
474 fn: A callable that will take as many arguments as there are
475 passed iterables.
476 timeout: The maximum number of seconds to wait. If None, then there
477 is no limit on the wait time.
478 chunksize: If greater than one, the iterables will be chopped into
479 chunks of size chunksize and submitted to the process pool.
480 If set to one, the items in the list will be sent one at a time.
481
482 Returns:
483 An iterator equivalent to: map(func, *iterables) but the calls may
484 be evaluated out-of-order.
485
486 Raises:
487 TimeoutError: If the entire result iterator could not be generated
488 before the given timeout.
489 Exception: If fn(*args) raises for any values.
490 """
491 if chunksize < 1:
492 raise ValueError("chunksize must be >= 1.")
493
494 results = super().map(partial(_process_chunk, fn),
495 _get_chunks(*iterables, chunksize=chunksize),
496 timeout=timeout)
Grzegorz Grzywacz97e1b1c2017-09-01 18:54:00 +0200497 return _chain_from_iterable_of_lists(results)
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200498
Brian Quinlan81c4d362010-09-18 22:35:02 +0000499 def shutdown(self, wait=True):
500 with self._shutdown_lock:
501 self._shutdown_thread = True
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100502 if self._queue_management_thread:
503 # Wake up queue management thread
504 self._result_queue.put(None)
505 if wait:
Brian Quinlan81c4d362010-09-18 22:35:02 +0000506 self._queue_management_thread.join()
Ezio Melottib5bc3532013-08-17 16:11:40 +0300507 # To reduce the risk of opening too many files, remove references to
Brian Quinlan81c4d362010-09-18 22:35:02 +0000508 # objects that use file descriptors.
509 self._queue_management_thread = None
Victor Stinnerb713adf2017-09-02 00:25:11 +0200510 if self._call_queue is not None:
511 self._call_queue.close()
512 if wait:
513 self._call_queue.join_thread()
514 self._call_queue = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000515 self._result_queue = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000516 self._processes = None
517 shutdown.__doc__ = _base.Executor.shutdown.__doc__
518
519atexit.register(_python_exit)