blob: 8f1d714193ab799650da6f1f62e841abb3c4358c [file] [log] [blame]
Brian Quinlan81c4d362010-09-18 22:35:02 +00001# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4"""Implements ProcessPoolExecutor.
5
6The follow diagram and text describe the data-flow through the system:
7
8|======================= In-process =====================|== Out-of-process ==|
9
10+----------+ +----------+ +--------+ +-----------+ +---------+
11| | => | Work Ids | => | | => | Call Q | => | |
12| | +----------+ | | +-----------+ | |
13| | | ... | | | | ... | | |
14| | | 6 | | | | 5, call() | | |
15| | | 7 | | | | ... | | |
16| Process | | ... | | Local | +-----------+ | Process |
17| Pool | +----------+ | Worker | | #1..n |
18| Executor | | Thread | | |
19| | +----------- + | | +-----------+ | |
20| | <=> | Work Items | <=> | | <= | Result Q | <= | |
21| | +------------+ | | +-----------+ | |
22| | | 6: call() | | | | ... | | |
23| | | future | | | | 4, result | | |
24| | | ... | | | | 3, except | | |
25+----------+ +------------+ +--------+ +-----------+ +---------+
26
27Executor.submit() called:
28- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29- adds the id of the _WorkItem to the "Work Ids" queue
30
31Local worker thread:
32- reads work ids from the "Work Ids" queue and looks up the corresponding
33 WorkItem from the "Work Items" dict: if the work item has been cancelled then
34 it is simply removed from the dict, otherwise it is repackaged as a
35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38- reads _ResultItems from "Result Q", updates the future stored in the
39 "Work Items" dict and deletes the dict entry
40
41Process #1..n:
42- reads _CallItems from "Call Q", executes the calls, and puts the resulting
Mark Dickinson5ee24042012-10-20 13:16:49 +010043 _ResultItems in "Result Q"
Brian Quinlan81c4d362010-09-18 22:35:02 +000044"""
45
46__author__ = 'Brian Quinlan (brian@sweetapp.com)'
47
48import atexit
Antoine Pitroudd696492011-06-08 17:21:55 +020049import os
Brian Quinlan81c4d362010-09-18 22:35:02 +000050from concurrent.futures import _base
51import queue
Richard Oudkerk1f2eaa92013-10-16 17:06:22 +010052from queue import Full
Brian Quinlan81c4d362010-09-18 22:35:02 +000053import multiprocessing
Richard Oudkerk1f2eaa92013-10-16 17:06:22 +010054from multiprocessing import SimpleQueue
Antoine Pitroubdb1cf12012-03-05 19:28:37 +010055from multiprocessing.connection import wait
Brian Quinlan81c4d362010-09-18 22:35:02 +000056import threading
57import weakref
Antoine Pitrou4aae2762014-10-04 20:20:10 +020058from functools import partial
59import itertools
Antoine Pitrou1285c9b2015-01-17 20:02:14 +010060import traceback
Brian Quinlan81c4d362010-09-18 22:35:02 +000061
62# Workers are created as daemon threads and processes. This is done to allow the
63# interpreter to exit when there are still idle processes in a
64# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
65# allowing workers to die with the interpreter has two undesirable properties:
Raymond Hettinger15f44ab2016-08-30 10:47:49 -070066# - The workers would still be running during interpreter shutdown,
Brian Quinlan81c4d362010-09-18 22:35:02 +000067# meaning that they would fail in unpredictable ways.
68# - The workers could be killed while evaluating a work item, which could
69# be bad if the callable being evaluated has external side-effects e.g.
70# writing to a file.
71#
72# To work around this problem, an exit handler is installed which tells the
73# workers to exit when their work queues are empty and then waits until the
74# threads/processes finish.
75
Antoine Pitrouc13d4542011-03-26 19:29:44 +010076_threads_queues = weakref.WeakKeyDictionary()
Brian Quinlan81c4d362010-09-18 22:35:02 +000077_shutdown = False
78
79def _python_exit():
80 global _shutdown
81 _shutdown = True
Antoine Pitrouc13d4542011-03-26 19:29:44 +010082 items = list(_threads_queues.items())
83 for t, q in items:
84 q.put(None)
85 for t, q in items:
86 t.join()
Brian Quinlan81c4d362010-09-18 22:35:02 +000087
88# Controls how many more calls than processes will be queued in the call queue.
89# A smaller number will mean that processes spend more time idle waiting for
90# work while a larger number will make Future.cancel() succeed less frequently
91# (Futures in the call queue cannot be cancelled).
92EXTRA_QUEUED_CALLS = 1
93
Antoine Pitrou1285c9b2015-01-17 20:02:14 +010094# Hack to embed stringification of remote traceback in local traceback
95
96class _RemoteTraceback(Exception):
97 def __init__(self, tb):
98 self.tb = tb
99 def __str__(self):
100 return self.tb
101
102class _ExceptionWithTraceback:
103 def __init__(self, exc, tb):
104 tb = traceback.format_exception(type(exc), exc, tb)
105 tb = ''.join(tb)
106 self.exc = exc
107 self.tb = '\n"""\n%s"""' % tb
108 def __reduce__(self):
109 return _rebuild_exc, (self.exc, self.tb)
110
111def _rebuild_exc(exc, tb):
112 exc.__cause__ = _RemoteTraceback(tb)
113 return exc
114
Brian Quinlan81c4d362010-09-18 22:35:02 +0000115class _WorkItem(object):
116 def __init__(self, future, fn, args, kwargs):
117 self.future = future
118 self.fn = fn
119 self.args = args
120 self.kwargs = kwargs
121
122class _ResultItem(object):
123 def __init__(self, work_id, exception=None, result=None):
124 self.work_id = work_id
125 self.exception = exception
126 self.result = result
127
128class _CallItem(object):
129 def __init__(self, work_id, fn, args, kwargs):
130 self.work_id = work_id
131 self.fn = fn
132 self.args = args
133 self.kwargs = kwargs
134
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200135def _get_chunks(*iterables, chunksize):
136 """ Iterates over zip()ed iterables in chunks. """
137 it = zip(*iterables)
138 while True:
139 chunk = tuple(itertools.islice(it, chunksize))
140 if not chunk:
141 return
142 yield chunk
143
144def _process_chunk(fn, chunk):
145 """ Processes a chunk of an iterable passed to map.
146
147 Runs the function passed to map() on a chunk of the
148 iterable passed to map.
149
150 This function is run in a separate process.
151
152 """
153 return [fn(*args) for args in chunk]
154
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200155def _process_worker(call_queue, result_queue):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000156 """Evaluates calls from call_queue and places the results in result_queue.
157
Georg Brandlfb1720b2010-12-09 18:08:43 +0000158 This worker is run in a separate process.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000159
160 Args:
161 call_queue: A multiprocessing.Queue of _CallItems that will be read and
162 evaluated by the worker.
163 result_queue: A multiprocessing.Queue of _ResultItems that will written
164 to by the worker.
165 shutdown: A multiprocessing.Event that will be set as a signal to the
166 worker that it should exit when call_queue is empty.
167 """
168 while True:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200169 call_item = call_queue.get(block=True)
170 if call_item is None:
171 # Wake up queue management thread
Antoine Pitroudd696492011-06-08 17:21:55 +0200172 result_queue.put(os.getpid())
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200173 return
Brian Quinlan81c4d362010-09-18 22:35:02 +0000174 try:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200175 r = call_item.fn(*call_item.args, **call_item.kwargs)
176 except BaseException as e:
Antoine Pitrou1285c9b2015-01-17 20:02:14 +0100177 exc = _ExceptionWithTraceback(e, e.__traceback__)
178 result_queue.put(_ResultItem(call_item.work_id, exception=exc))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000179 else:
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200180 result_queue.put(_ResultItem(call_item.work_id,
181 result=r))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000182
183def _add_call_item_to_queue(pending_work_items,
184 work_ids,
185 call_queue):
186 """Fills call_queue with _WorkItems from pending_work_items.
187
188 This function never blocks.
189
190 Args:
191 pending_work_items: A dict mapping work ids to _WorkItems e.g.
192 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
193 work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
194 are consumed and the corresponding _WorkItems from
195 pending_work_items are transformed into _CallItems and put in
196 call_queue.
197 call_queue: A multiprocessing.Queue that will be filled with _CallItems
198 derived from _WorkItems.
199 """
200 while True:
201 if call_queue.full():
202 return
203 try:
204 work_id = work_ids.get(block=False)
205 except queue.Empty:
206 return
207 else:
208 work_item = pending_work_items[work_id]
209
210 if work_item.future.set_running_or_notify_cancel():
211 call_queue.put(_CallItem(work_id,
212 work_item.fn,
213 work_item.args,
214 work_item.kwargs),
215 block=True)
216 else:
217 del pending_work_items[work_id]
218 continue
219
Antoine Pitroub87a56a2011-05-03 16:34:42 +0200220def _queue_management_worker(executor_reference,
221 processes,
222 pending_work_items,
223 work_ids_queue,
224 call_queue,
225 result_queue):
Brian Quinlan81c4d362010-09-18 22:35:02 +0000226 """Manages the communication between this process and the worker processes.
227
228 This function is run in a local thread.
229
230 Args:
231 executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
232 this thread. Used to determine if the ProcessPoolExecutor has been
233 garbage collected and that this function can exit.
234 process: A list of the multiprocessing.Process instances used as
235 workers.
236 pending_work_items: A dict mapping work ids to _WorkItems e.g.
237 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
238 work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
239 call_queue: A multiprocessing.Queue that will be filled with _CallItems
240 derived from _WorkItems for processing by the process workers.
241 result_queue: A multiprocessing.Queue of _ResultItems generated by the
242 process workers.
Brian Quinlan81c4d362010-09-18 22:35:02 +0000243 """
Antoine Pitrou020436b2011-07-02 21:20:25 +0200244 executor = None
245
246 def shutting_down():
247 return _shutdown or executor is None or executor._shutdown_thread
Antoine Pitroudd696492011-06-08 17:21:55 +0200248
249 def shutdown_worker():
250 # This is an upper bound
251 nb_children_alive = sum(p.is_alive() for p in processes.values())
252 for i in range(0, nb_children_alive):
Antoine Pitrou1c405b32011-07-03 13:17:06 +0200253 call_queue.put_nowait(None)
Antoine Pitroudc19c242011-07-16 01:51:58 +0200254 # Release the queue's resources as soon as possible.
255 call_queue.close()
Antoine Pitroudd696492011-06-08 17:21:55 +0200256 # If .join() is not called on the created processes then
Antoine Pitrou020436b2011-07-02 21:20:25 +0200257 # some multiprocessing.Queue methods may deadlock on Mac OS X.
Antoine Pitroudd696492011-06-08 17:21:55 +0200258 for p in processes.values():
259 p.join()
260
Antoine Pitroubdb1cf12012-03-05 19:28:37 +0100261 reader = result_queue._reader
262
Brian Quinlan81c4d362010-09-18 22:35:02 +0000263 while True:
264 _add_call_item_to_queue(pending_work_items,
265 work_ids_queue,
266 call_queue)
267
Antoine Pitroudd696492011-06-08 17:21:55 +0200268 sentinels = [p.sentinel for p in processes.values()]
269 assert sentinels
Antoine Pitroubdb1cf12012-03-05 19:28:37 +0100270 ready = wait([reader] + sentinels)
271 if reader in ready:
272 result_item = reader.recv()
273 else:
Antoine Pitroudd696492011-06-08 17:21:55 +0200274 # Mark the process pool broken so that submits fail right now.
275 executor = executor_reference()
276 if executor is not None:
277 executor._broken = True
278 executor._shutdown_thread = True
Antoine Pitrou020436b2011-07-02 21:20:25 +0200279 executor = None
Antoine Pitroudd696492011-06-08 17:21:55 +0200280 # All futures in flight must be marked failed
281 for work_id, work_item in pending_work_items.items():
282 work_item.future.set_exception(
283 BrokenProcessPool(
284 "A process in the process pool was "
285 "terminated abruptly while the future was "
286 "running or pending."
287 ))
Andrew Svetlov6b973742012-11-03 15:36:01 +0200288 # Delete references to object. See issue16284
289 del work_item
Antoine Pitroudd696492011-06-08 17:21:55 +0200290 pending_work_items.clear()
291 # Terminate remaining workers forcibly: the queues or their
292 # locks may be in a dirty state and block forever.
293 for p in processes.values():
294 p.terminate()
Antoine Pitroudc19c242011-07-16 01:51:58 +0200295 shutdown_worker()
Antoine Pitroudd696492011-06-08 17:21:55 +0200296 return
297 if isinstance(result_item, int):
298 # Clean shutdown of a worker using its PID
299 # (avoids marking the executor broken)
Antoine Pitrou020436b2011-07-02 21:20:25 +0200300 assert shutting_down()
Antoine Pitroud06a0652011-07-16 01:13:34 +0200301 p = processes.pop(result_item)
302 p.join()
Antoine Pitrou020436b2011-07-02 21:20:25 +0200303 if not processes:
304 shutdown_worker()
305 return
Antoine Pitroudd696492011-06-08 17:21:55 +0200306 elif result_item is not None:
307 work_item = pending_work_items.pop(result_item.work_id, None)
308 # work_item can be None if another process terminated (see above)
309 if work_item is not None:
310 if result_item.exception:
311 work_item.future.set_exception(result_item.exception)
312 else:
313 work_item.future.set_result(result_item.result)
Andrew Svetlov6b973742012-11-03 15:36:01 +0200314 # Delete references to object. See issue16284
315 del work_item
Antoine Pitroudd696492011-06-08 17:21:55 +0200316 # Check whether we should start shutting down.
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100317 executor = executor_reference()
318 # No more work items can be added if:
319 # - The interpreter is shutting down OR
320 # - The executor that owns this worker has been collected OR
321 # - The executor that owns this worker has been shutdown.
Antoine Pitrou020436b2011-07-02 21:20:25 +0200322 if shutting_down():
Antoine Pitrou020436b2011-07-02 21:20:25 +0200323 try:
Antoine Pitrou1c405b32011-07-03 13:17:06 +0200324 # Since no new work items can be added, it is safe to shutdown
325 # this thread if there are no pending work items.
326 if not pending_work_items:
327 shutdown_worker()
328 return
Antoine Pitrou020436b2011-07-02 21:20:25 +0200329 except Full:
330 # This is not a problem: we will eventually be woken up (in
Antoine Pitrou1c405b32011-07-03 13:17:06 +0200331 # result_queue.get()) and be able to send a sentinel again.
Antoine Pitrou020436b2011-07-02 21:20:25 +0200332 pass
333 executor = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000334
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000335_system_limits_checked = False
336_system_limited = None
337def _check_system_limits():
338 global _system_limits_checked, _system_limited
339 if _system_limits_checked:
340 if _system_limited:
341 raise NotImplementedError(_system_limited)
342 _system_limits_checked = True
343 try:
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000344 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
345 except (AttributeError, ValueError):
346 # sysconf not available or setting not available
347 return
348 if nsems_max == -1:
Ezio Melottib5bc3532013-08-17 16:11:40 +0300349 # indetermined limit, assume that limit is determined
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000350 # by available memory only
351 return
352 if nsems_max >= 256:
353 # minimum number of semaphores available
354 # according to POSIX
355 return
356 _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
357 raise NotImplementedError(_system_limited)
358
Antoine Pitroudd696492011-06-08 17:21:55 +0200359
360class BrokenProcessPool(RuntimeError):
361 """
362 Raised when a process in a ProcessPoolExecutor terminated abruptly
363 while a future was in the running state.
364 """
365
366
Brian Quinlan81c4d362010-09-18 22:35:02 +0000367class ProcessPoolExecutor(_base.Executor):
368 def __init__(self, max_workers=None):
369 """Initializes a new ProcessPoolExecutor instance.
370
371 Args:
372 max_workers: The maximum number of processes that can be used to
373 execute the given calls. If None or not given then as many
374 worker processes will be created as the machine has processors.
375 """
Martin v. Löwis9f6d48b2011-01-03 00:07:01 +0000376 _check_system_limits()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000377
378 if max_workers is None:
Charles-François Natali37cfb0a2013-06-28 19:25:45 +0200379 self._max_workers = os.cpu_count() or 1
Brian Quinlan81c4d362010-09-18 22:35:02 +0000380 else:
Brian Quinlan20efceb2014-05-17 13:51:10 -0700381 if max_workers <= 0:
382 raise ValueError("max_workers must be greater than 0")
383
Brian Quinlan81c4d362010-09-18 22:35:02 +0000384 self._max_workers = max_workers
385
386 # Make the call queue slightly larger than the number of processes to
387 # prevent the worker processes from idling. But don't make it too big
388 # because futures in the call queue cannot be cancelled.
389 self._call_queue = multiprocessing.Queue(self._max_workers +
390 EXTRA_QUEUED_CALLS)
Antoine Pitroudc19c242011-07-16 01:51:58 +0200391 # Killed worker processes can produce spurious "broken pipe"
392 # tracebacks in the queue's own worker thread. But we detect killed
393 # processes anyway, so silence the tracebacks.
394 self._call_queue._ignore_epipe = True
Antoine Pitroub7877f22011-04-12 17:58:11 +0200395 self._result_queue = SimpleQueue()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000396 self._work_ids = queue.Queue()
397 self._queue_management_thread = None
Antoine Pitroudd696492011-06-08 17:21:55 +0200398 # Map of pids to processes
399 self._processes = {}
Brian Quinlan81c4d362010-09-18 22:35:02 +0000400
401 # Shutdown is a two-step process.
402 self._shutdown_thread = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000403 self._shutdown_lock = threading.Lock()
Antoine Pitroudd696492011-06-08 17:21:55 +0200404 self._broken = False
Brian Quinlan81c4d362010-09-18 22:35:02 +0000405 self._queue_count = 0
406 self._pending_work_items = {}
407
408 def _start_queue_management_thread(self):
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100409 # When the executor gets lost, the weakref callback will wake up
410 # the queue management thread.
411 def weakref_cb(_, q=self._result_queue):
412 q.put(None)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000413 if self._queue_management_thread is None:
Antoine Pitroudd696492011-06-08 17:21:55 +0200414 # Start the processes so that their sentinels are known.
415 self._adjust_process_count()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000416 self._queue_management_thread = threading.Thread(
Antoine Pitroub87a56a2011-05-03 16:34:42 +0200417 target=_queue_management_worker,
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100418 args=(weakref.ref(self, weakref_cb),
Brian Quinlan81c4d362010-09-18 22:35:02 +0000419 self._processes,
420 self._pending_work_items,
421 self._work_ids,
422 self._call_queue,
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200423 self._result_queue))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000424 self._queue_management_thread.daemon = True
425 self._queue_management_thread.start()
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100426 _threads_queues[self._queue_management_thread] = self._result_queue
Brian Quinlan81c4d362010-09-18 22:35:02 +0000427
428 def _adjust_process_count(self):
429 for _ in range(len(self._processes), self._max_workers):
430 p = multiprocessing.Process(
431 target=_process_worker,
432 args=(self._call_queue,
Antoine Pitrou27be5da2011-04-12 17:48:46 +0200433 self._result_queue))
Brian Quinlan81c4d362010-09-18 22:35:02 +0000434 p.start()
Antoine Pitroudd696492011-06-08 17:21:55 +0200435 self._processes[p.pid] = p
Brian Quinlan81c4d362010-09-18 22:35:02 +0000436
437 def submit(self, fn, *args, **kwargs):
438 with self._shutdown_lock:
Antoine Pitroudd696492011-06-08 17:21:55 +0200439 if self._broken:
440 raise BrokenProcessPool('A child process terminated '
441 'abruptly, the process pool is not usable anymore')
Brian Quinlan81c4d362010-09-18 22:35:02 +0000442 if self._shutdown_thread:
443 raise RuntimeError('cannot schedule new futures after shutdown')
444
445 f = _base.Future()
446 w = _WorkItem(f, fn, args, kwargs)
447
448 self._pending_work_items[self._queue_count] = w
449 self._work_ids.put(self._queue_count)
450 self._queue_count += 1
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100451 # Wake up queue management thread
452 self._result_queue.put(None)
Brian Quinlan81c4d362010-09-18 22:35:02 +0000453
454 self._start_queue_management_thread()
Brian Quinlan81c4d362010-09-18 22:35:02 +0000455 return f
456 submit.__doc__ = _base.Executor.submit.__doc__
457
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200458 def map(self, fn, *iterables, timeout=None, chunksize=1):
Martin Panterd2ad5712015-11-02 04:20:33 +0000459 """Returns an iterator equivalent to map(fn, iter).
Antoine Pitrou4aae2762014-10-04 20:20:10 +0200460
461 Args:
462 fn: A callable that will take as many arguments as there are
463 passed iterables.
464 timeout: The maximum number of seconds to wait. If None, then there
465 is no limit on the wait time.
466 chunksize: If greater than one, the iterables will be chopped into
467 chunks of size chunksize and submitted to the process pool.
468 If set to one, the items in the list will be sent one at a time.
469
470 Returns:
471 An iterator equivalent to: map(func, *iterables) but the calls may
472 be evaluated out-of-order.
473
474 Raises:
475 TimeoutError: If the entire result iterator could not be generated
476 before the given timeout.
477 Exception: If fn(*args) raises for any values.
478 """
479 if chunksize < 1:
480 raise ValueError("chunksize must be >= 1.")
481
482 results = super().map(partial(_process_chunk, fn),
483 _get_chunks(*iterables, chunksize=chunksize),
484 timeout=timeout)
485 return itertools.chain.from_iterable(results)
486
Brian Quinlan81c4d362010-09-18 22:35:02 +0000487 def shutdown(self, wait=True):
488 with self._shutdown_lock:
489 self._shutdown_thread = True
Antoine Pitrouc13d4542011-03-26 19:29:44 +0100490 if self._queue_management_thread:
491 # Wake up queue management thread
492 self._result_queue.put(None)
493 if wait:
Brian Quinlan81c4d362010-09-18 22:35:02 +0000494 self._queue_management_thread.join()
Ezio Melottib5bc3532013-08-17 16:11:40 +0300495 # To reduce the risk of opening too many files, remove references to
Brian Quinlan81c4d362010-09-18 22:35:02 +0000496 # objects that use file descriptors.
497 self._queue_management_thread = None
498 self._call_queue = None
499 self._result_queue = None
Brian Quinlan81c4d362010-09-18 22:35:02 +0000500 self._processes = None
501 shutdown.__doc__ = _base.Executor.shutdown.__doc__
502
503atexit.register(_python_exit)