blob: 18a56f8524b4b6faeeb04f10fda9e467cb188d50 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module providing the `Pool` class for managing a process pool
3#
4# multiprocessing/pool.py
5#
R. David Murray3fc969a2010-12-14 01:38:16 +00006# Copyright (c) 2006-2008, R Oudkerk
Richard Oudkerk3e268aa2012-04-30 12:13:55 +01007# Licensed to PSF under a Contributor Agreement.
Benjamin Petersone711caf2008-06-11 16:44:04 +00008#
9
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010010__all__ = ['Pool', 'ThreadPool']
Benjamin Petersone711caf2008-06-11 16:44:04 +000011
12#
13# Imports
14#
15
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import collections
Victor Stinner9a8d1d72018-12-20 20:33:51 +010017import itertools
Charles-François Natali37cfb0a2013-06-28 19:25:45 +020018import os
Victor Stinner9a8d1d72018-12-20 20:33:51 +010019import queue
20import threading
Benjamin Petersone711caf2008-06-11 16:44:04 +000021import time
Richard Oudkerk85757832013-05-06 11:38:25 +010022import traceback
Victor Stinner9a8d1d72018-12-20 20:33:51 +010023import warnings
Benjamin Petersone711caf2008-06-11 16:44:04 +000024
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010025# If threading is available then ThreadPool should be provided. Therefore
26# we avoid top-level imports which are liable to fail on some systems.
27from . import util
Victor Stinner7fa767e2014-03-20 09:16:38 +010028from . import get_context, TimeoutError
Benjamin Petersone711caf2008-06-11 16:44:04 +000029
30#
31# Constants representing the state of a pool
32#
33
Victor Stinner9a8d1d72018-12-20 20:33:51 +010034INIT = "INIT"
Victor Stinner2b417fb2018-12-14 11:13:18 +010035RUN = "RUN"
36CLOSE = "CLOSE"
37TERMINATE = "TERMINATE"
Benjamin Petersone711caf2008-06-11 16:44:04 +000038
39#
40# Miscellaneous
41#
42
43job_counter = itertools.count()
44
45def mapstar(args):
46 return list(map(*args))
47
Antoine Pitroude911b22011-12-21 11:03:24 +010048def starmapstar(args):
49 return list(itertools.starmap(args[0], args[1]))
50
Benjamin Petersone711caf2008-06-11 16:44:04 +000051#
Richard Oudkerk85757832013-05-06 11:38:25 +010052# Hack to embed stringification of remote traceback in local traceback
53#
54
55class RemoteTraceback(Exception):
56 def __init__(self, tb):
57 self.tb = tb
58 def __str__(self):
59 return self.tb
60
61class ExceptionWithTraceback:
62 def __init__(self, exc, tb):
63 tb = traceback.format_exception(type(exc), exc, tb)
64 tb = ''.join(tb)
65 self.exc = exc
66 self.tb = '\n"""\n%s"""' % tb
67 def __reduce__(self):
68 return rebuild_exc, (self.exc, self.tb)
69
70def rebuild_exc(exc, tb):
71 exc.__cause__ = RemoteTraceback(tb)
72 return exc
73
74#
Benjamin Petersone711caf2008-06-11 16:44:04 +000075# Code run by worker processes
76#
77
Ask Solem2afcbf22010-11-09 20:55:52 +000078class MaybeEncodingError(Exception):
79 """Wraps possible unpickleable errors, so they can be
80 safely sent through the socket."""
81
82 def __init__(self, exc, value):
83 self.exc = repr(exc)
84 self.value = repr(value)
85 super(MaybeEncodingError, self).__init__(self.exc, self.value)
86
87 def __str__(self):
88 return "Error sending result: '%s'. Reason: '%s'" % (self.value,
89 self.exc)
90
91 def __repr__(self):
Serhiy Storchaka465e60e2014-07-25 23:36:00 +030092 return "<%s: %s>" % (self.__class__.__name__, self)
Ask Solem2afcbf22010-11-09 20:55:52 +000093
94
Richard Oudkerk80a5be12014-03-23 12:30:54 +000095def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
96 wrap_exception=False):
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -050097 if (maxtasks is not None) and not (isinstance(maxtasks, int)
98 and maxtasks >= 1):
99 raise AssertionError("Maxtasks {!r} is not valid".format(maxtasks))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000100 put = outqueue.put
101 get = inqueue.get
102 if hasattr(inqueue, '_writer'):
103 inqueue._writer.close()
104 outqueue._reader.close()
105
106 if initializer is not None:
107 initializer(*initargs)
108
Jesse Noller1f0b6582010-01-27 03:36:01 +0000109 completed = 0
110 while maxtasks is None or (maxtasks and completed < maxtasks):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000111 try:
112 task = get()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200113 except (EOFError, OSError):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100114 util.debug('worker got EOFError or OSError -- exiting')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000115 break
116
117 if task is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100118 util.debug('worker got sentinel -- exiting')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000119 break
120
121 job, i, func, args, kwds = task
122 try:
123 result = (True, func(*args, **kwds))
124 except Exception as e:
Xiang Zhang794623b2017-03-29 11:58:54 +0800125 if wrap_exception and func is not _helper_reraises_exception:
Richard Oudkerk80a5be12014-03-23 12:30:54 +0000126 e = ExceptionWithTraceback(e, e.__traceback__)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000127 result = (False, e)
Ask Solem2afcbf22010-11-09 20:55:52 +0000128 try:
129 put((job, i, result))
130 except Exception as e:
131 wrapped = MaybeEncodingError(e, result[1])
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100132 util.debug("Possible encoding error while sending result: %s" % (
Ask Solem2afcbf22010-11-09 20:55:52 +0000133 wrapped))
134 put((job, i, (False, wrapped)))
Antoine Pitrou89889452017-03-24 13:52:11 +0100135
136 task = job = result = func = args = kwds = None
Jesse Noller1f0b6582010-01-27 03:36:01 +0000137 completed += 1
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100138 util.debug('worker exiting after %d tasks' % completed)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000139
Xiang Zhang794623b2017-03-29 11:58:54 +0800140def _helper_reraises_exception(ex):
141 'Pickle-able helper function for use by _guarded_task_generation.'
142 raise ex
143
Benjamin Petersone711caf2008-06-11 16:44:04 +0000144#
145# Class representing a process pool
146#
147
148class Pool(object):
149 '''
Georg Brandl92905032008-11-22 08:51:39 +0000150 Class which supports an async version of applying functions to arguments.
Benjamin Petersone711caf2008-06-11 16:44:04 +0000151 '''
Richard Oudkerk80a5be12014-03-23 12:30:54 +0000152 _wrap_exception = True
153
Pablo Galindo3766f182019-02-11 17:29:00 +0000154 @staticmethod
155 def Process(ctx, *args, **kwds):
156 return ctx.Process(*args, **kwds)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000157
Jesse Noller1f0b6582010-01-27 03:36:01 +0000158 def __init__(self, processes=None, initializer=None, initargs=(),
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100159 maxtasksperchild=None, context=None):
Victor Stinner9a8d1d72018-12-20 20:33:51 +0100160 # Attributes initialized early to make sure that they exist in
161 # __del__() if __init__() raises an exception
162 self._pool = []
163 self._state = INIT
164
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100165 self._ctx = context or get_context()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000166 self._setup_queues()
Antoine Pitrouab745042018-01-18 10:38:03 +0100167 self._taskqueue = queue.SimpleQueue()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000168 self._cache = {}
Jesse Noller1f0b6582010-01-27 03:36:01 +0000169 self._maxtasksperchild = maxtasksperchild
170 self._initializer = initializer
171 self._initargs = initargs
Benjamin Petersone711caf2008-06-11 16:44:04 +0000172
173 if processes is None:
Charles-François Natali37cfb0a2013-06-28 19:25:45 +0200174 processes = os.cpu_count() or 1
Victor Stinner2fae27b2011-06-20 17:53:35 +0200175 if processes < 1:
176 raise ValueError("Number of processes must be at least 1")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000177
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200178 if initializer is not None and not callable(initializer):
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000179 raise TypeError('initializer must be a callable')
180
Jesse Noller1f0b6582010-01-27 03:36:01 +0000181 self._processes = processes
Julien Palard5d236ca2018-11-04 23:40:32 +0100182 try:
183 self._repopulate_pool()
184 except Exception:
185 for p in self._pool:
186 if p.exitcode is None:
187 p.terminate()
188 for p in self._pool:
189 p.join()
190 raise
Jesse Noller1f0b6582010-01-27 03:36:01 +0000191
192 self._worker_handler = threading.Thread(
193 target=Pool._handle_workers,
Pablo Galindo3766f182019-02-11 17:29:00 +0000194 args=(self._cache, self._taskqueue, self._ctx, self.Process,
195 self._processes, self._pool, self._inqueue, self._outqueue,
196 self._initializer, self._initargs, self._maxtasksperchild,
197 self._wrap_exception)
Jesse Noller1f0b6582010-01-27 03:36:01 +0000198 )
199 self._worker_handler.daemon = True
200 self._worker_handler._state = RUN
201 self._worker_handler.start()
202
Victor Stinner9dfc7542018-12-06 08:51:47 +0100203
Benjamin Petersone711caf2008-06-11 16:44:04 +0000204 self._task_handler = threading.Thread(
205 target=Pool._handle_tasks,
Richard Oudkerke90cedb2013-10-28 23:11:58 +0000206 args=(self._taskqueue, self._quick_put, self._outqueue,
207 self._pool, self._cache)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000208 )
Benjamin Petersonfae4c622008-08-18 18:40:08 +0000209 self._task_handler.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000210 self._task_handler._state = RUN
211 self._task_handler.start()
212
213 self._result_handler = threading.Thread(
214 target=Pool._handle_results,
215 args=(self._outqueue, self._quick_get, self._cache)
216 )
Benjamin Petersonfae4c622008-08-18 18:40:08 +0000217 self._result_handler.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000218 self._result_handler._state = RUN
219 self._result_handler.start()
220
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100221 self._terminate = util.Finalize(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000222 self, self._terminate_pool,
223 args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
Jesse Noller1f0b6582010-01-27 03:36:01 +0000224 self._worker_handler, self._task_handler,
225 self._result_handler, self._cache),
Benjamin Petersone711caf2008-06-11 16:44:04 +0000226 exitpriority=15
227 )
Victor Stinner9a8d1d72018-12-20 20:33:51 +0100228 self._state = RUN
229
230 # Copy globals as function locals to make sure that they are available
231 # during Python shutdown when the Pool is destroyed.
232 def __del__(self, _warn=warnings.warn, RUN=RUN):
233 if self._state == RUN:
234 _warn(f"unclosed running multiprocessing pool {self!r}",
235 ResourceWarning, source=self)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000236
Victor Stinner2b417fb2018-12-14 11:13:18 +0100237 def __repr__(self):
238 cls = self.__class__
239 return (f'<{cls.__module__}.{cls.__qualname__} '
240 f'state={self._state} '
241 f'pool_size={len(self._pool)}>')
242
Pablo Galindo3766f182019-02-11 17:29:00 +0000243 @staticmethod
244 def _join_exited_workers(pool):
Jesse Noller1f0b6582010-01-27 03:36:01 +0000245 """Cleanup after any worker processes which have exited due to reaching
246 their specified lifetime. Returns True if any workers were cleaned up.
247 """
248 cleaned = False
Pablo Galindo3766f182019-02-11 17:29:00 +0000249 for i in reversed(range(len(pool))):
250 worker = pool[i]
Jesse Noller1f0b6582010-01-27 03:36:01 +0000251 if worker.exitcode is not None:
252 # worker exited
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100253 util.debug('cleaning up worker %d' % i)
Jesse Noller1f0b6582010-01-27 03:36:01 +0000254 worker.join()
255 cleaned = True
Pablo Galindo3766f182019-02-11 17:29:00 +0000256 del pool[i]
Jesse Noller1f0b6582010-01-27 03:36:01 +0000257 return cleaned
258
259 def _repopulate_pool(self):
Pablo Galindo3766f182019-02-11 17:29:00 +0000260 return self._repopulate_pool_static(self._ctx, self.Process,
261 self._processes,
262 self._pool, self._inqueue,
263 self._outqueue, self._initializer,
264 self._initargs,
265 self._maxtasksperchild,
266 self._wrap_exception)
267
268 @staticmethod
269 def _repopulate_pool_static(ctx, Process, processes, pool, inqueue,
270 outqueue, initializer, initargs,
271 maxtasksperchild, wrap_exception):
Jesse Noller1f0b6582010-01-27 03:36:01 +0000272 """Bring the number of pool processes up to the specified number,
273 for use after reaping workers which have exited.
274 """
Pablo Galindo3766f182019-02-11 17:29:00 +0000275 for i in range(processes - len(pool)):
276 w = Process(ctx, target=worker,
277 args=(inqueue, outqueue,
278 initializer,
279 initargs, maxtasksperchild,
280 wrap_exception))
Jesse Noller1f0b6582010-01-27 03:36:01 +0000281 w.name = w.name.replace('Process', 'PoolWorker')
282 w.daemon = True
283 w.start()
Pablo Galindo3766f182019-02-11 17:29:00 +0000284 pool.append(w)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100285 util.debug('added worker')
Jesse Noller1f0b6582010-01-27 03:36:01 +0000286
Pablo Galindo3766f182019-02-11 17:29:00 +0000287 @staticmethod
288 def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue,
289 initializer, initargs, maxtasksperchild,
290 wrap_exception):
Jesse Noller1f0b6582010-01-27 03:36:01 +0000291 """Clean up any exited workers and start replacements for them.
292 """
Pablo Galindo3766f182019-02-11 17:29:00 +0000293 if Pool._join_exited_workers(pool):
294 Pool._repopulate_pool_static(ctx, Process, processes, pool,
295 inqueue, outqueue, initializer,
296 initargs, maxtasksperchild,
297 wrap_exception)
Jesse Noller1f0b6582010-01-27 03:36:01 +0000298
Benjamin Petersone711caf2008-06-11 16:44:04 +0000299 def _setup_queues(self):
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100300 self._inqueue = self._ctx.SimpleQueue()
301 self._outqueue = self._ctx.SimpleQueue()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000302 self._quick_put = self._inqueue._writer.send
303 self._quick_get = self._outqueue._reader.recv
304
Victor Stinner08c2ba02018-12-13 02:15:30 +0100305 def _check_running(self):
306 if self._state != RUN:
307 raise ValueError("Pool not running")
308
Benjamin Petersone711caf2008-06-11 16:44:04 +0000309 def apply(self, func, args=(), kwds={}):
310 '''
Georg Brandl92905032008-11-22 08:51:39 +0000311 Equivalent of `func(*args, **kwds)`.
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500312 Pool must be running.
Benjamin Petersone711caf2008-06-11 16:44:04 +0000313 '''
Benjamin Petersone711caf2008-06-11 16:44:04 +0000314 return self.apply_async(func, args, kwds).get()
315
316 def map(self, func, iterable, chunksize=None):
317 '''
Georg Brandl92905032008-11-22 08:51:39 +0000318 Apply `func` to each element in `iterable`, collecting the results
319 in a list that is returned.
Benjamin Petersone711caf2008-06-11 16:44:04 +0000320 '''
Antoine Pitroude911b22011-12-21 11:03:24 +0100321 return self._map_async(func, iterable, mapstar, chunksize).get()
322
323 def starmap(self, func, iterable, chunksize=None):
324 '''
325 Like `map()` method but the elements of the `iterable` are expected to
326 be iterables as well and will be unpacked as arguments. Hence
327 `func` and (a, b) becomes func(a, b).
328 '''
Antoine Pitroude911b22011-12-21 11:03:24 +0100329 return self._map_async(func, iterable, starmapstar, chunksize).get()
330
331 def starmap_async(self, func, iterable, chunksize=None, callback=None,
332 error_callback=None):
333 '''
334 Asynchronous version of `starmap()` method.
335 '''
Antoine Pitroude911b22011-12-21 11:03:24 +0100336 return self._map_async(func, iterable, starmapstar, chunksize,
337 callback, error_callback)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000338
Xiang Zhang794623b2017-03-29 11:58:54 +0800339 def _guarded_task_generation(self, result_job, func, iterable):
340 '''Provides a generator of tasks for imap and imap_unordered with
341 appropriate handling for iterables which throw exceptions during
342 iteration.'''
343 try:
344 i = -1
345 for i, x in enumerate(iterable):
346 yield (result_job, i, func, (x,), {})
347 except Exception as e:
348 yield (result_job, i+1, _helper_reraises_exception, (e,), {})
349
Benjamin Petersone711caf2008-06-11 16:44:04 +0000350 def imap(self, func, iterable, chunksize=1):
351 '''
Georg Brandl92905032008-11-22 08:51:39 +0000352 Equivalent of `map()` -- can be MUCH slower than `Pool.map()`.
Benjamin Petersone711caf2008-06-11 16:44:04 +0000353 '''
Victor Stinner08c2ba02018-12-13 02:15:30 +0100354 self._check_running()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000355 if chunksize == 1:
Pablo Galindo3766f182019-02-11 17:29:00 +0000356 result = IMapIterator(self)
Xiang Zhang794623b2017-03-29 11:58:54 +0800357 self._taskqueue.put(
358 (
359 self._guarded_task_generation(result._job, func, iterable),
360 result._set_length
361 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000362 return result
363 else:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500364 if chunksize < 1:
365 raise ValueError(
366 "Chunksize must be 1+, not {0:n}".format(
367 chunksize))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000368 task_batches = Pool._get_tasks(func, iterable, chunksize)
Pablo Galindo3766f182019-02-11 17:29:00 +0000369 result = IMapIterator(self)
Xiang Zhang794623b2017-03-29 11:58:54 +0800370 self._taskqueue.put(
371 (
372 self._guarded_task_generation(result._job,
373 mapstar,
374 task_batches),
375 result._set_length
376 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000377 return (item for chunk in result for item in chunk)
378
379 def imap_unordered(self, func, iterable, chunksize=1):
380 '''
Georg Brandl92905032008-11-22 08:51:39 +0000381 Like `imap()` method but ordering of results is arbitrary.
Benjamin Petersone711caf2008-06-11 16:44:04 +0000382 '''
Victor Stinner08c2ba02018-12-13 02:15:30 +0100383 self._check_running()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000384 if chunksize == 1:
Pablo Galindo3766f182019-02-11 17:29:00 +0000385 result = IMapUnorderedIterator(self)
Xiang Zhang794623b2017-03-29 11:58:54 +0800386 self._taskqueue.put(
387 (
388 self._guarded_task_generation(result._job, func, iterable),
389 result._set_length
390 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000391 return result
392 else:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500393 if chunksize < 1:
394 raise ValueError(
395 "Chunksize must be 1+, not {0!r}".format(chunksize))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000396 task_batches = Pool._get_tasks(func, iterable, chunksize)
Pablo Galindo3766f182019-02-11 17:29:00 +0000397 result = IMapUnorderedIterator(self)
Xiang Zhang794623b2017-03-29 11:58:54 +0800398 self._taskqueue.put(
399 (
400 self._guarded_task_generation(result._job,
401 mapstar,
402 task_batches),
403 result._set_length
404 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000405 return (item for chunk in result for item in chunk)
406
Ask Solem2afcbf22010-11-09 20:55:52 +0000407 def apply_async(self, func, args=(), kwds={}, callback=None,
408 error_callback=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000409 '''
Georg Brandl92905032008-11-22 08:51:39 +0000410 Asynchronous version of `apply()` method.
Benjamin Petersone711caf2008-06-11 16:44:04 +0000411 '''
Victor Stinner08c2ba02018-12-13 02:15:30 +0100412 self._check_running()
Pablo Galindo3766f182019-02-11 17:29:00 +0000413 result = ApplyResult(self, callback, error_callback)
Xiang Zhang794623b2017-03-29 11:58:54 +0800414 self._taskqueue.put(([(result._job, 0, func, args, kwds)], None))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000415 return result
416
Ask Solem2afcbf22010-11-09 20:55:52 +0000417 def map_async(self, func, iterable, chunksize=None, callback=None,
418 error_callback=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000419 '''
Georg Brandl92905032008-11-22 08:51:39 +0000420 Asynchronous version of `map()` method.
Benjamin Petersone711caf2008-06-11 16:44:04 +0000421 '''
Hynek Schlawack254af262012-10-27 12:53:02 +0200422 return self._map_async(func, iterable, mapstar, chunksize, callback,
423 error_callback)
Antoine Pitroude911b22011-12-21 11:03:24 +0100424
425 def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
426 error_callback=None):
427 '''
428 Helper function to implement map, starmap and their async counterparts.
429 '''
Victor Stinner08c2ba02018-12-13 02:15:30 +0100430 self._check_running()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000431 if not hasattr(iterable, '__len__'):
432 iterable = list(iterable)
433
434 if chunksize is None:
435 chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
436 if extra:
437 chunksize += 1
Alexandre Vassalottie52e3782009-07-17 09:18:18 +0000438 if len(iterable) == 0:
439 chunksize = 0
Benjamin Petersone711caf2008-06-11 16:44:04 +0000440
441 task_batches = Pool._get_tasks(func, iterable, chunksize)
Pablo Galindo3766f182019-02-11 17:29:00 +0000442 result = MapResult(self, chunksize, len(iterable), callback,
Ask Solem2afcbf22010-11-09 20:55:52 +0000443 error_callback=error_callback)
Xiang Zhang794623b2017-03-29 11:58:54 +0800444 self._taskqueue.put(
445 (
446 self._guarded_task_generation(result._job,
447 mapper,
448 task_batches),
449 None
450 )
451 )
Benjamin Petersone711caf2008-06-11 16:44:04 +0000452 return result
453
454 @staticmethod
Pablo Galindo3766f182019-02-11 17:29:00 +0000455 def _handle_workers(cache, taskqueue, ctx, Process, processes, pool,
456 inqueue, outqueue, initializer, initargs,
457 maxtasksperchild, wrap_exception):
Charles-François Natalif8859e12011-10-24 18:45:29 +0200458 thread = threading.current_thread()
459
460 # Keep maintaining workers until the cache gets drained, unless the pool
461 # is terminated.
Pablo Galindo3766f182019-02-11 17:29:00 +0000462 while thread._state == RUN or (cache and thread._state != TERMINATE):
463 Pool._maintain_pool(ctx, Process, processes, pool, inqueue,
464 outqueue, initializer, initargs,
465 maxtasksperchild, wrap_exception)
Jesse Noller1f0b6582010-01-27 03:36:01 +0000466 time.sleep(0.1)
Antoine Pitrou81dee6b2011-04-11 00:18:59 +0200467 # send sentinel to stop workers
Pablo Galindo3766f182019-02-11 17:29:00 +0000468 taskqueue.put(None)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100469 util.debug('worker handler exiting')
Jesse Noller1f0b6582010-01-27 03:36:01 +0000470
471 @staticmethod
Richard Oudkerke90cedb2013-10-28 23:11:58 +0000472 def _handle_tasks(taskqueue, put, outqueue, pool, cache):
Benjamin Peterson672b8032008-06-11 19:14:14 +0000473 thread = threading.current_thread()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000474
475 for taskseq, set_length in iter(taskqueue.get, None):
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +0200476 task = None
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +0200477 try:
Xiang Zhang794623b2017-03-29 11:58:54 +0800478 # iterating taskseq cannot fail
479 for task in taskseq:
Victor Stinner2b417fb2018-12-14 11:13:18 +0100480 if thread._state != RUN:
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +0200481 util.debug('task handler found thread._state != RUN')
482 break
Richard Oudkerke90cedb2013-10-28 23:11:58 +0000483 try:
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +0200484 put(task)
485 except Exception as e:
Xiang Zhang794623b2017-03-29 11:58:54 +0800486 job, idx = task[:2]
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +0200487 try:
Xiang Zhang794623b2017-03-29 11:58:54 +0800488 cache[job]._set(idx, (False, e))
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +0200489 except KeyError:
490 pass
491 else:
492 if set_length:
493 util.debug('doing set_length()')
Xiang Zhang794623b2017-03-29 11:58:54 +0800494 idx = task[1] if task else -1
495 set_length(idx + 1)
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +0200496 continue
497 break
Antoine Pitrou89889452017-03-24 13:52:11 +0100498 finally:
499 task = taskseq = job = None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000500 else:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100501 util.debug('task handler got sentinel')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000502
Benjamin Petersone711caf2008-06-11 16:44:04 +0000503 try:
504 # tell result handler to finish when cache is empty
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100505 util.debug('task handler sending sentinel to result handler')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000506 outqueue.put(None)
507
508 # tell workers there is no more work
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100509 util.debug('task handler sending sentinel to workers')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000510 for p in pool:
511 put(None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200512 except OSError:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100513 util.debug('task handler got OSError when sending sentinels')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000514
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100515 util.debug('task handler exiting')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000516
517 @staticmethod
518 def _handle_results(outqueue, get, cache):
Benjamin Peterson672b8032008-06-11 19:14:14 +0000519 thread = threading.current_thread()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000520
521 while 1:
522 try:
523 task = get()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200524 except (OSError, EOFError):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100525 util.debug('result handler got EOFError/OSError -- exiting')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000526 return
527
Victor Stinner2dfe3512018-12-16 23:40:49 +0100528 if thread._state != RUN:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500529 assert thread._state == TERMINATE, "Thread not in TERMINATE"
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100530 util.debug('result handler found thread._state=TERMINATE')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000531 break
532
533 if task is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100534 util.debug('result handler got sentinel')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000535 break
536
537 job, i, obj = task
538 try:
539 cache[job]._set(i, obj)
540 except KeyError:
541 pass
Antoine Pitrou89889452017-03-24 13:52:11 +0100542 task = job = obj = None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000543
544 while cache and thread._state != TERMINATE:
545 try:
546 task = get()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200547 except (OSError, EOFError):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100548 util.debug('result handler got EOFError/OSError -- exiting')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000549 return
550
551 if task is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100552 util.debug('result handler ignoring extra sentinel')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000553 continue
554 job, i, obj = task
555 try:
556 cache[job]._set(i, obj)
557 except KeyError:
558 pass
Antoine Pitrou89889452017-03-24 13:52:11 +0100559 task = job = obj = None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000560
561 if hasattr(outqueue, '_reader'):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100562 util.debug('ensuring that outqueue is not full')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000563 # If we don't make room available in outqueue then
564 # attempts to add the sentinel (None) to outqueue may
565 # block. There is guaranteed to be no more than 2 sentinels.
566 try:
567 for i in range(10):
568 if not outqueue._reader.poll():
569 break
570 get()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200571 except (OSError, EOFError):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000572 pass
573
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100574 util.debug('result handler exiting: len(cache)=%s, thread._state=%s',
Benjamin Petersone711caf2008-06-11 16:44:04 +0000575 len(cache), thread._state)
576
577 @staticmethod
578 def _get_tasks(func, it, size):
579 it = iter(it)
580 while 1:
581 x = tuple(itertools.islice(it, size))
582 if not x:
583 return
584 yield (func, x)
585
586 def __reduce__(self):
587 raise NotImplementedError(
588 'pool objects cannot be passed between processes or pickled'
589 )
590
591 def close(self):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100592 util.debug('closing pool')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000593 if self._state == RUN:
594 self._state = CLOSE
Jesse Noller1f0b6582010-01-27 03:36:01 +0000595 self._worker_handler._state = CLOSE
Benjamin Petersone711caf2008-06-11 16:44:04 +0000596
597 def terminate(self):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100598 util.debug('terminating pool')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000599 self._state = TERMINATE
Jesse Noller1f0b6582010-01-27 03:36:01 +0000600 self._worker_handler._state = TERMINATE
Benjamin Petersone711caf2008-06-11 16:44:04 +0000601 self._terminate()
602
603 def join(self):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100604 util.debug('joining pool')
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500605 if self._state == RUN:
606 raise ValueError("Pool is still running")
607 elif self._state not in (CLOSE, TERMINATE):
608 raise ValueError("In unknown state")
Jesse Noller1f0b6582010-01-27 03:36:01 +0000609 self._worker_handler.join()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000610 self._task_handler.join()
611 self._result_handler.join()
612 for p in self._pool:
613 p.join()
614
615 @staticmethod
616 def _help_stuff_finish(inqueue, task_handler, size):
617 # task_handler may be blocked trying to put items on inqueue
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100618 util.debug('removing tasks from inqueue until task handler finished')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000619 inqueue._rlock.acquire()
Benjamin Peterson672b8032008-06-11 19:14:14 +0000620 while task_handler.is_alive() and inqueue._reader.poll():
Benjamin Petersone711caf2008-06-11 16:44:04 +0000621 inqueue._reader.recv()
622 time.sleep(0)
623
624 @classmethod
625 def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
Jesse Noller1f0b6582010-01-27 03:36:01 +0000626 worker_handler, task_handler, result_handler, cache):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000627 # this is guaranteed to only be called once
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100628 util.debug('finalizing pool')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000629
Jesse Noller1f0b6582010-01-27 03:36:01 +0000630 worker_handler._state = TERMINATE
Benjamin Petersone711caf2008-06-11 16:44:04 +0000631 task_handler._state = TERMINATE
Benjamin Petersone711caf2008-06-11 16:44:04 +0000632
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100633 util.debug('helping task handler/workers to finish')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000634 cls._help_stuff_finish(inqueue, task_handler, len(pool))
635
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500636 if (not result_handler.is_alive()) and (len(cache) != 0):
637 raise AssertionError(
638 "Cannot have cache with result_hander not alive")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000639
640 result_handler._state = TERMINATE
641 outqueue.put(None) # sentinel
642
Antoine Pitrou81dee6b2011-04-11 00:18:59 +0200643 # We must wait for the worker handler to exit before terminating
644 # workers because we don't want workers to be restarted behind our back.
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100645 util.debug('joining worker handler')
Richard Oudkerkf29ec4b2012-06-18 15:54:57 +0100646 if threading.current_thread() is not worker_handler:
647 worker_handler.join()
Antoine Pitrou81dee6b2011-04-11 00:18:59 +0200648
Jesse Noller1f0b6582010-01-27 03:36:01 +0000649 # Terminate workers which haven't already finished.
Benjamin Petersone711caf2008-06-11 16:44:04 +0000650 if pool and hasattr(pool[0], 'terminate'):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100651 util.debug('terminating workers')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000652 for p in pool:
Jesse Noller1f0b6582010-01-27 03:36:01 +0000653 if p.exitcode is None:
654 p.terminate()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000655
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100656 util.debug('joining task handler')
Richard Oudkerkf29ec4b2012-06-18 15:54:57 +0100657 if threading.current_thread() is not task_handler:
658 task_handler.join()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000659
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100660 util.debug('joining result handler')
Richard Oudkerkf29ec4b2012-06-18 15:54:57 +0100661 if threading.current_thread() is not result_handler:
662 result_handler.join()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000663
664 if pool and hasattr(pool[0], 'terminate'):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100665 util.debug('joining pool workers')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000666 for p in pool:
Florent Xicluna998171f2010-03-08 13:32:17 +0000667 if p.is_alive():
Jesse Noller1f0b6582010-01-27 03:36:01 +0000668 # worker has not yet exited
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100669 util.debug('cleaning up worker %d' % p.pid)
Florent Xicluna998171f2010-03-08 13:32:17 +0000670 p.join()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000671
Richard Oudkerkd69cfe82012-06-18 17:47:52 +0100672 def __enter__(self):
Victor Stinner08c2ba02018-12-13 02:15:30 +0100673 self._check_running()
Richard Oudkerkd69cfe82012-06-18 17:47:52 +0100674 return self
675
676 def __exit__(self, exc_type, exc_val, exc_tb):
677 self.terminate()
678
Benjamin Petersone711caf2008-06-11 16:44:04 +0000679#
680# Class whose instances are returned by `Pool.apply_async()`
681#
682
683class ApplyResult(object):
684
Pablo Galindo3766f182019-02-11 17:29:00 +0000685 def __init__(self, pool, callback, error_callback):
686 self._pool = pool
Richard Oudkerk692130a2012-05-25 13:26:53 +0100687 self._event = threading.Event()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000688 self._job = next(job_counter)
Pablo Galindo3766f182019-02-11 17:29:00 +0000689 self._cache = pool._cache
Benjamin Petersone711caf2008-06-11 16:44:04 +0000690 self._callback = callback
Ask Solem2afcbf22010-11-09 20:55:52 +0000691 self._error_callback = error_callback
Pablo Galindo3766f182019-02-11 17:29:00 +0000692 self._cache[self._job] = self
Benjamin Petersone711caf2008-06-11 16:44:04 +0000693
694 def ready(self):
Richard Oudkerk692130a2012-05-25 13:26:53 +0100695 return self._event.is_set()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000696
697 def successful(self):
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500698 if not self.ready():
699 raise ValueError("{0!r} not ready".format(self))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000700 return self._success
701
702 def wait(self, timeout=None):
Richard Oudkerk692130a2012-05-25 13:26:53 +0100703 self._event.wait(timeout)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000704
705 def get(self, timeout=None):
706 self.wait(timeout)
Richard Oudkerk692130a2012-05-25 13:26:53 +0100707 if not self.ready():
Benjamin Petersone711caf2008-06-11 16:44:04 +0000708 raise TimeoutError
709 if self._success:
710 return self._value
711 else:
712 raise self._value
713
714 def _set(self, i, obj):
715 self._success, self._value = obj
716 if self._callback and self._success:
717 self._callback(self._value)
Ask Solem2afcbf22010-11-09 20:55:52 +0000718 if self._error_callback and not self._success:
719 self._error_callback(self._value)
Richard Oudkerk692130a2012-05-25 13:26:53 +0100720 self._event.set()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000721 del self._cache[self._job]
Pablo Galindo3766f182019-02-11 17:29:00 +0000722 self._pool = None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000723
Richard Oudkerkdef51ca2013-05-06 12:10:04 +0100724AsyncResult = ApplyResult # create alias -- see #17805
725
Benjamin Petersone711caf2008-06-11 16:44:04 +0000726#
727# Class whose instances are returned by `Pool.map_async()`
728#
729
730class MapResult(ApplyResult):
731
Pablo Galindo3766f182019-02-11 17:29:00 +0000732 def __init__(self, pool, chunksize, length, callback, error_callback):
733 ApplyResult.__init__(self, pool, callback,
Ask Solem2afcbf22010-11-09 20:55:52 +0000734 error_callback=error_callback)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000735 self._success = True
736 self._value = [None] * length
737 self._chunksize = chunksize
738 if chunksize <= 0:
739 self._number_left = 0
Richard Oudkerk692130a2012-05-25 13:26:53 +0100740 self._event.set()
Pablo Galindo3766f182019-02-11 17:29:00 +0000741 del self._cache[self._job]
Benjamin Petersone711caf2008-06-11 16:44:04 +0000742 else:
743 self._number_left = length//chunksize + bool(length % chunksize)
744
745 def _set(self, i, success_result):
Charles-François Natali78f55ff2016-02-10 22:58:18 +0000746 self._number_left -= 1
Benjamin Petersone711caf2008-06-11 16:44:04 +0000747 success, result = success_result
Charles-François Natali78f55ff2016-02-10 22:58:18 +0000748 if success and self._success:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000749 self._value[i*self._chunksize:(i+1)*self._chunksize] = result
Benjamin Petersone711caf2008-06-11 16:44:04 +0000750 if self._number_left == 0:
751 if self._callback:
752 self._callback(self._value)
753 del self._cache[self._job]
Richard Oudkerk692130a2012-05-25 13:26:53 +0100754 self._event.set()
Pablo Galindo3766f182019-02-11 17:29:00 +0000755 self._pool = None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000756 else:
Charles-François Natali78f55ff2016-02-10 22:58:18 +0000757 if not success and self._success:
758 # only store first exception
759 self._success = False
760 self._value = result
761 if self._number_left == 0:
762 # only consider the result ready once all jobs are done
763 if self._error_callback:
764 self._error_callback(self._value)
765 del self._cache[self._job]
766 self._event.set()
Pablo Galindo3766f182019-02-11 17:29:00 +0000767 self._pool = None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000768
769#
770# Class whose instances are returned by `Pool.imap()`
771#
772
773class IMapIterator(object):
774
Pablo Galindo3766f182019-02-11 17:29:00 +0000775 def __init__(self, pool):
776 self._pool = pool
Benjamin Petersone711caf2008-06-11 16:44:04 +0000777 self._cond = threading.Condition(threading.Lock())
778 self._job = next(job_counter)
Pablo Galindo3766f182019-02-11 17:29:00 +0000779 self._cache = pool._cache
Benjamin Petersone711caf2008-06-11 16:44:04 +0000780 self._items = collections.deque()
781 self._index = 0
782 self._length = None
783 self._unsorted = {}
Pablo Galindo3766f182019-02-11 17:29:00 +0000784 self._cache[self._job] = self
Benjamin Petersone711caf2008-06-11 16:44:04 +0000785
786 def __iter__(self):
787 return self
788
789 def next(self, timeout=None):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100790 with self._cond:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000791 try:
792 item = self._items.popleft()
793 except IndexError:
794 if self._index == self._length:
Pablo Galindo3766f182019-02-11 17:29:00 +0000795 self._pool = None
Serhiy Storchaka5affd232017-04-05 09:37:24 +0300796 raise StopIteration from None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000797 self._cond.wait(timeout)
798 try:
799 item = self._items.popleft()
800 except IndexError:
801 if self._index == self._length:
Pablo Galindo3766f182019-02-11 17:29:00 +0000802 self._pool = None
Serhiy Storchaka5affd232017-04-05 09:37:24 +0300803 raise StopIteration from None
804 raise TimeoutError from None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000805
806 success, value = item
807 if success:
808 return value
809 raise value
810
811 __next__ = next # XXX
812
813 def _set(self, i, obj):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100814 with self._cond:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000815 if self._index == i:
816 self._items.append(obj)
817 self._index += 1
818 while self._index in self._unsorted:
819 obj = self._unsorted.pop(self._index)
820 self._items.append(obj)
821 self._index += 1
822 self._cond.notify()
823 else:
824 self._unsorted[i] = obj
825
826 if self._index == self._length:
827 del self._cache[self._job]
Pablo Galindo3766f182019-02-11 17:29:00 +0000828 self._pool = None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000829
830 def _set_length(self, length):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100831 with self._cond:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000832 self._length = length
833 if self._index == self._length:
834 self._cond.notify()
835 del self._cache[self._job]
Pablo Galindo3766f182019-02-11 17:29:00 +0000836 self._pool = None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000837
838#
839# Class whose instances are returned by `Pool.imap_unordered()`
840#
841
842class IMapUnorderedIterator(IMapIterator):
843
844 def _set(self, i, obj):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100845 with self._cond:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000846 self._items.append(obj)
847 self._index += 1
848 self._cond.notify()
849 if self._index == self._length:
850 del self._cache[self._job]
Pablo Galindo3766f182019-02-11 17:29:00 +0000851 self._pool = None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000852
853#
854#
855#
856
857class ThreadPool(Pool):
Richard Oudkerk80a5be12014-03-23 12:30:54 +0000858 _wrap_exception = False
Benjamin Petersone711caf2008-06-11 16:44:04 +0000859
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100860 @staticmethod
Pablo Galindo3766f182019-02-11 17:29:00 +0000861 def Process(ctx, *args, **kwds):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100862 from .dummy import Process
863 return Process(*args, **kwds)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000864
865 def __init__(self, processes=None, initializer=None, initargs=()):
866 Pool.__init__(self, processes, initializer, initargs)
867
868 def _setup_queues(self):
Antoine Pitrouab745042018-01-18 10:38:03 +0100869 self._inqueue = queue.SimpleQueue()
870 self._outqueue = queue.SimpleQueue()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000871 self._quick_put = self._inqueue.put
872 self._quick_get = self._outqueue.get
873
874 @staticmethod
875 def _help_stuff_finish(inqueue, task_handler, size):
Antoine Pitrouab745042018-01-18 10:38:03 +0100876 # drain inqueue, and put sentinels at its head to make workers finish
877 try:
878 while True:
879 inqueue.get(block=False)
880 except queue.Empty:
881 pass
882 for i in range(size):
883 inqueue.put(None)