blob: fc03a0a66cad1f13c99eac55962f7d363f7b7acf [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module providing the `Pool` class for managing a process pool
3#
4# multiprocessing/pool.py
5#
R. David Murrayd3820662010-12-14 01:41:07 +00006# Copyright (c) 2006-2008, R Oudkerk
7# All rights reserved.
8#
9# Redistribution and use in source and binary forms, with or without
10# modification, are permitted provided that the following conditions
11# are met:
12#
13# 1. Redistributions of source code must retain the above copyright
14# notice, this list of conditions and the following disclaimer.
15# 2. Redistributions in binary form must reproduce the above copyright
16# notice, this list of conditions and the following disclaimer in the
17# documentation and/or other materials provided with the distribution.
18# 3. Neither the name of author nor the names of any contributors may be
19# used to endorse or promote products derived from this software
20# without specific prior written permission.
21#
22# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
23# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
26# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
28# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
29# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
31# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32# SUCH DAMAGE.
Benjamin Petersone711caf2008-06-11 16:44:04 +000033#
34
35__all__ = ['Pool']
36
37#
38# Imports
39#
40
41import threading
42import queue
43import itertools
44import collections
45import time
46
47from multiprocessing import Process, cpu_count, TimeoutError
48from multiprocessing.util import Finalize, debug
49
50#
51# Constants representing the state of a pool
52#
53
54RUN = 0
55CLOSE = 1
56TERMINATE = 2
57
58#
59# Miscellaneous
60#
61
62job_counter = itertools.count()
63
64def mapstar(args):
65 return list(map(*args))
66
67#
68# Code run by worker processes
69#
70
71def worker(inqueue, outqueue, initializer=None, initargs=()):
72 put = outqueue.put
73 get = inqueue.get
74 if hasattr(inqueue, '_writer'):
75 inqueue._writer.close()
76 outqueue._reader.close()
77
78 if initializer is not None:
79 initializer(*initargs)
80
81 while 1:
82 try:
83 task = get()
84 except (EOFError, IOError):
85 debug('worker got EOFError or IOError -- exiting')
86 break
87
88 if task is None:
89 debug('worker got sentinel -- exiting')
90 break
91
92 job, i, func, args, kwds = task
93 try:
94 result = (True, func(*args, **kwds))
95 except Exception as e:
96 result = (False, e)
97 put((job, i, result))
98
99#
100# Class representing a process pool
101#
102
103class Pool(object):
104 '''
Georg Brandl92905032008-11-22 08:51:39 +0000105 Class which supports an async version of applying functions to arguments.
Benjamin Petersone711caf2008-06-11 16:44:04 +0000106 '''
107 Process = Process
108
109 def __init__(self, processes=None, initializer=None, initargs=()):
110 self._setup_queues()
111 self._taskqueue = queue.Queue()
112 self._cache = {}
113 self._state = RUN
114
115 if processes is None:
116 try:
117 processes = cpu_count()
118 except NotImplementedError:
119 processes = 1
120
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000121 if initializer is not None and not hasattr(initializer, '__call__'):
122 raise TypeError('initializer must be a callable')
123
Benjamin Petersone711caf2008-06-11 16:44:04 +0000124 self._pool = []
125 for i in range(processes):
126 w = self.Process(
127 target=worker,
128 args=(self._inqueue, self._outqueue, initializer, initargs)
129 )
130 self._pool.append(w)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000131 w.name = w.name.replace('Process', 'PoolWorker')
Benjamin Petersonfae4c622008-08-18 18:40:08 +0000132 w.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000133 w.start()
134
135 self._task_handler = threading.Thread(
136 target=Pool._handle_tasks,
137 args=(self._taskqueue, self._quick_put, self._outqueue, self._pool)
138 )
Benjamin Petersonfae4c622008-08-18 18:40:08 +0000139 self._task_handler.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000140 self._task_handler._state = RUN
141 self._task_handler.start()
142
143 self._result_handler = threading.Thread(
144 target=Pool._handle_results,
145 args=(self._outqueue, self._quick_get, self._cache)
146 )
Benjamin Petersonfae4c622008-08-18 18:40:08 +0000147 self._result_handler.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000148 self._result_handler._state = RUN
149 self._result_handler.start()
150
151 self._terminate = Finalize(
152 self, self._terminate_pool,
153 args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
154 self._task_handler, self._result_handler, self._cache),
155 exitpriority=15
156 )
157
158 def _setup_queues(self):
159 from .queues import SimpleQueue
160 self._inqueue = SimpleQueue()
161 self._outqueue = SimpleQueue()
162 self._quick_put = self._inqueue._writer.send
163 self._quick_get = self._outqueue._reader.recv
164
165 def apply(self, func, args=(), kwds={}):
166 '''
Georg Brandl92905032008-11-22 08:51:39 +0000167 Equivalent of `func(*args, **kwds)`.
Benjamin Petersone711caf2008-06-11 16:44:04 +0000168 '''
169 assert self._state == RUN
170 return self.apply_async(func, args, kwds).get()
171
172 def map(self, func, iterable, chunksize=None):
173 '''
Georg Brandl92905032008-11-22 08:51:39 +0000174 Apply `func` to each element in `iterable`, collecting the results
175 in a list that is returned.
Benjamin Petersone711caf2008-06-11 16:44:04 +0000176 '''
177 assert self._state == RUN
178 return self.map_async(func, iterable, chunksize).get()
179
180 def imap(self, func, iterable, chunksize=1):
181 '''
Georg Brandl92905032008-11-22 08:51:39 +0000182 Equivalent of `map()` -- can be MUCH slower than `Pool.map()`.
Benjamin Petersone711caf2008-06-11 16:44:04 +0000183 '''
184 assert self._state == RUN
185 if chunksize == 1:
186 result = IMapIterator(self._cache)
187 self._taskqueue.put((((result._job, i, func, (x,), {})
188 for i, x in enumerate(iterable)), result._set_length))
189 return result
190 else:
191 assert chunksize > 1
192 task_batches = Pool._get_tasks(func, iterable, chunksize)
193 result = IMapIterator(self._cache)
194 self._taskqueue.put((((result._job, i, mapstar, (x,), {})
195 for i, x in enumerate(task_batches)), result._set_length))
196 return (item for chunk in result for item in chunk)
197
198 def imap_unordered(self, func, iterable, chunksize=1):
199 '''
Georg Brandl92905032008-11-22 08:51:39 +0000200 Like `imap()` method but ordering of results is arbitrary.
Benjamin Petersone711caf2008-06-11 16:44:04 +0000201 '''
202 assert self._state == RUN
203 if chunksize == 1:
204 result = IMapUnorderedIterator(self._cache)
205 self._taskqueue.put((((result._job, i, func, (x,), {})
206 for i, x in enumerate(iterable)), result._set_length))
207 return result
208 else:
209 assert chunksize > 1
210 task_batches = Pool._get_tasks(func, iterable, chunksize)
211 result = IMapUnorderedIterator(self._cache)
212 self._taskqueue.put((((result._job, i, mapstar, (x,), {})
213 for i, x in enumerate(task_batches)), result._set_length))
214 return (item for chunk in result for item in chunk)
215
216 def apply_async(self, func, args=(), kwds={}, callback=None):
217 '''
Georg Brandl92905032008-11-22 08:51:39 +0000218 Asynchronous version of `apply()` method.
Benjamin Petersone711caf2008-06-11 16:44:04 +0000219 '''
220 assert self._state == RUN
221 result = ApplyResult(self._cache, callback)
222 self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
223 return result
224
225 def map_async(self, func, iterable, chunksize=None, callback=None):
226 '''
Georg Brandl92905032008-11-22 08:51:39 +0000227 Asynchronous version of `map()` method.
Benjamin Petersone711caf2008-06-11 16:44:04 +0000228 '''
229 assert self._state == RUN
230 if not hasattr(iterable, '__len__'):
231 iterable = list(iterable)
232
233 if chunksize is None:
234 chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
235 if extra:
236 chunksize += 1
Georg Brandld80344f2009-08-13 12:26:19 +0000237 if len(iterable) == 0:
238 chunksize = 0
Benjamin Petersone711caf2008-06-11 16:44:04 +0000239
240 task_batches = Pool._get_tasks(func, iterable, chunksize)
241 result = MapResult(self._cache, chunksize, len(iterable), callback)
242 self._taskqueue.put((((result._job, i, mapstar, (x,), {})
243 for i, x in enumerate(task_batches)), None))
244 return result
245
246 @staticmethod
247 def _handle_tasks(taskqueue, put, outqueue, pool):
Benjamin Peterson672b8032008-06-11 19:14:14 +0000248 thread = threading.current_thread()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000249
250 for taskseq, set_length in iter(taskqueue.get, None):
251 i = -1
252 for i, task in enumerate(taskseq):
253 if thread._state:
254 debug('task handler found thread._state != RUN')
255 break
256 try:
257 put(task)
258 except IOError:
259 debug('could not put task on queue')
260 break
261 else:
262 if set_length:
263 debug('doing set_length()')
264 set_length(i+1)
265 continue
266 break
267 else:
268 debug('task handler got sentinel')
269
270
271 try:
272 # tell result handler to finish when cache is empty
273 debug('task handler sending sentinel to result handler')
274 outqueue.put(None)
275
276 # tell workers there is no more work
277 debug('task handler sending sentinel to workers')
278 for p in pool:
279 put(None)
280 except IOError:
281 debug('task handler got IOError when sending sentinels')
282
283 debug('task handler exiting')
284
285 @staticmethod
286 def _handle_results(outqueue, get, cache):
Benjamin Peterson672b8032008-06-11 19:14:14 +0000287 thread = threading.current_thread()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000288
289 while 1:
290 try:
291 task = get()
292 except (IOError, EOFError):
293 debug('result handler got EOFError/IOError -- exiting')
294 return
295
296 if thread._state:
297 assert thread._state == TERMINATE
298 debug('result handler found thread._state=TERMINATE')
299 break
300
301 if task is None:
302 debug('result handler got sentinel')
303 break
304
305 job, i, obj = task
306 try:
307 cache[job]._set(i, obj)
308 except KeyError:
309 pass
310
311 while cache and thread._state != TERMINATE:
312 try:
313 task = get()
314 except (IOError, EOFError):
315 debug('result handler got EOFError/IOError -- exiting')
316 return
317
318 if task is None:
319 debug('result handler ignoring extra sentinel')
320 continue
321 job, i, obj = task
322 try:
323 cache[job]._set(i, obj)
324 except KeyError:
325 pass
326
327 if hasattr(outqueue, '_reader'):
328 debug('ensuring that outqueue is not full')
329 # If we don't make room available in outqueue then
330 # attempts to add the sentinel (None) to outqueue may
331 # block. There is guaranteed to be no more than 2 sentinels.
332 try:
333 for i in range(10):
334 if not outqueue._reader.poll():
335 break
336 get()
337 except (IOError, EOFError):
338 pass
339
340 debug('result handler exiting: len(cache)=%s, thread._state=%s',
341 len(cache), thread._state)
342
343 @staticmethod
344 def _get_tasks(func, it, size):
345 it = iter(it)
346 while 1:
347 x = tuple(itertools.islice(it, size))
348 if not x:
349 return
350 yield (func, x)
351
352 def __reduce__(self):
353 raise NotImplementedError(
354 'pool objects cannot be passed between processes or pickled'
355 )
356
357 def close(self):
358 debug('closing pool')
359 if self._state == RUN:
360 self._state = CLOSE
361 self._taskqueue.put(None)
362
363 def terminate(self):
364 debug('terminating pool')
365 self._state = TERMINATE
366 self._terminate()
367
368 def join(self):
369 debug('joining pool')
370 assert self._state in (CLOSE, TERMINATE)
371 self._task_handler.join()
372 self._result_handler.join()
373 for p in self._pool:
374 p.join()
375
376 @staticmethod
377 def _help_stuff_finish(inqueue, task_handler, size):
378 # task_handler may be blocked trying to put items on inqueue
379 debug('removing tasks from inqueue until task handler finished')
380 inqueue._rlock.acquire()
Benjamin Peterson672b8032008-06-11 19:14:14 +0000381 while task_handler.is_alive() and inqueue._reader.poll():
Benjamin Petersone711caf2008-06-11 16:44:04 +0000382 inqueue._reader.recv()
383 time.sleep(0)
384
385 @classmethod
386 def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
387 task_handler, result_handler, cache):
388 # this is guaranteed to only be called once
389 debug('finalizing pool')
390
391 task_handler._state = TERMINATE
392 taskqueue.put(None) # sentinel
393
394 debug('helping task handler/workers to finish')
395 cls._help_stuff_finish(inqueue, task_handler, len(pool))
396
Benjamin Peterson672b8032008-06-11 19:14:14 +0000397 assert result_handler.is_alive() or len(cache) == 0
Benjamin Petersone711caf2008-06-11 16:44:04 +0000398
399 result_handler._state = TERMINATE
400 outqueue.put(None) # sentinel
401
402 if pool and hasattr(pool[0], 'terminate'):
403 debug('terminating workers')
404 for p in pool:
405 p.terminate()
406
407 debug('joining task handler')
408 task_handler.join(1e100)
409
410 debug('joining result handler')
411 result_handler.join(1e100)
412
413 if pool and hasattr(pool[0], 'terminate'):
414 debug('joining pool workers')
415 for p in pool:
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000416 if p.is_alive():
417 # worker has not yet exited
418 debug('cleaning up worker %d' % p.pid)
419 p.join()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000420
421#
422# Class whose instances are returned by `Pool.apply_async()`
423#
424
425class ApplyResult(object):
426
427 def __init__(self, cache, callback):
428 self._cond = threading.Condition(threading.Lock())
429 self._job = next(job_counter)
430 self._cache = cache
431 self._ready = False
432 self._callback = callback
433 cache[self._job] = self
434
435 def ready(self):
436 return self._ready
437
438 def successful(self):
439 assert self._ready
440 return self._success
441
442 def wait(self, timeout=None):
443 self._cond.acquire()
444 try:
445 if not self._ready:
446 self._cond.wait(timeout)
447 finally:
448 self._cond.release()
449
450 def get(self, timeout=None):
451 self.wait(timeout)
452 if not self._ready:
453 raise TimeoutError
454 if self._success:
455 return self._value
456 else:
457 raise self._value
458
459 def _set(self, i, obj):
460 self._success, self._value = obj
461 if self._callback and self._success:
462 self._callback(self._value)
463 self._cond.acquire()
464 try:
465 self._ready = True
466 self._cond.notify()
467 finally:
468 self._cond.release()
469 del self._cache[self._job]
470
471#
472# Class whose instances are returned by `Pool.map_async()`
473#
474
475class MapResult(ApplyResult):
476
477 def __init__(self, cache, chunksize, length, callback):
478 ApplyResult.__init__(self, cache, callback)
479 self._success = True
480 self._value = [None] * length
481 self._chunksize = chunksize
482 if chunksize <= 0:
483 self._number_left = 0
484 self._ready = True
485 else:
486 self._number_left = length//chunksize + bool(length % chunksize)
487
488 def _set(self, i, success_result):
489 success, result = success_result
490 if success:
491 self._value[i*self._chunksize:(i+1)*self._chunksize] = result
492 self._number_left -= 1
493 if self._number_left == 0:
494 if self._callback:
495 self._callback(self._value)
496 del self._cache[self._job]
497 self._cond.acquire()
498 try:
499 self._ready = True
500 self._cond.notify()
501 finally:
502 self._cond.release()
503
504 else:
505 self._success = False
506 self._value = result
507 del self._cache[self._job]
508 self._cond.acquire()
509 try:
510 self._ready = True
511 self._cond.notify()
512 finally:
513 self._cond.release()
514
515#
516# Class whose instances are returned by `Pool.imap()`
517#
518
519class IMapIterator(object):
520
521 def __init__(self, cache):
522 self._cond = threading.Condition(threading.Lock())
523 self._job = next(job_counter)
524 self._cache = cache
525 self._items = collections.deque()
526 self._index = 0
527 self._length = None
528 self._unsorted = {}
529 cache[self._job] = self
530
531 def __iter__(self):
532 return self
533
534 def next(self, timeout=None):
535 self._cond.acquire()
536 try:
537 try:
538 item = self._items.popleft()
539 except IndexError:
540 if self._index == self._length:
541 raise StopIteration
542 self._cond.wait(timeout)
543 try:
544 item = self._items.popleft()
545 except IndexError:
546 if self._index == self._length:
547 raise StopIteration
548 raise TimeoutError
549 finally:
550 self._cond.release()
551
552 success, value = item
553 if success:
554 return value
555 raise value
556
557 __next__ = next # XXX
558
559 def _set(self, i, obj):
560 self._cond.acquire()
561 try:
562 if self._index == i:
563 self._items.append(obj)
564 self._index += 1
565 while self._index in self._unsorted:
566 obj = self._unsorted.pop(self._index)
567 self._items.append(obj)
568 self._index += 1
569 self._cond.notify()
570 else:
571 self._unsorted[i] = obj
572
573 if self._index == self._length:
574 del self._cache[self._job]
575 finally:
576 self._cond.release()
577
578 def _set_length(self, length):
579 self._cond.acquire()
580 try:
581 self._length = length
582 if self._index == self._length:
583 self._cond.notify()
584 del self._cache[self._job]
585 finally:
586 self._cond.release()
587
588#
589# Class whose instances are returned by `Pool.imap_unordered()`
590#
591
592class IMapUnorderedIterator(IMapIterator):
593
594 def _set(self, i, obj):
595 self._cond.acquire()
596 try:
597 self._items.append(obj)
598 self._index += 1
599 self._cond.notify()
600 if self._index == self._length:
601 del self._cache[self._job]
602 finally:
603 self._cond.release()
604
605#
606#
607#
608
609class ThreadPool(Pool):
610
611 from .dummy import Process
612
613 def __init__(self, processes=None, initializer=None, initargs=()):
614 Pool.__init__(self, processes, initializer, initargs)
615
616 def _setup_queues(self):
617 self._inqueue = queue.Queue()
618 self._outqueue = queue.Queue()
619 self._quick_put = self._inqueue.put
620 self._quick_get = self._outqueue.get
621
622 @staticmethod
623 def _help_stuff_finish(inqueue, task_handler, size):
624 # put sentinels at head of inqueue to make workers finish
625 inqueue.not_empty.acquire()
626 try:
627 inqueue.queue.clear()
628 inqueue.queue.extend([None] * size)
Benjamin Peterson672b8032008-06-11 19:14:14 +0000629 inqueue.not_empty.notify_all()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000630 finally:
631 inqueue.not_empty.release()