blob: 8555da965fb02a43210d09208b671dfecf562f8a [file] [log] [blame]
Benjamin Peterson7f03ea72008-06-13 19:20:48 +00001#
2# Module providing the `Pool` class for managing a process pool
3#
4# multiprocessing/pool.py
5#
6# Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt
7#
8
9__all__ = ['Pool']
10
11#
12# Imports
13#
14
15import threading
16import Queue
17import itertools
18import collections
19import time
20
21from multiprocessing import Process, cpu_count, TimeoutError
22from multiprocessing.util import Finalize, debug
23
24#
25# Constants representing the state of a pool
26#
27
28RUN = 0
29CLOSE = 1
30TERMINATE = 2
31
32#
33# Miscellaneous
34#
35
36job_counter = itertools.count()
37
38def mapstar(args):
39 return map(*args)
40
41#
42# Code run by worker processes
43#
44
Jesse Noller654ade32010-01-27 03:05:57 +000045def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
46 assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000047 put = outqueue.put
48 get = inqueue.get
49 if hasattr(inqueue, '_writer'):
50 inqueue._writer.close()
51 outqueue._reader.close()
52
53 if initializer is not None:
54 initializer(*initargs)
55
Jesse Noller654ade32010-01-27 03:05:57 +000056 completed = 0
57 while maxtasks is None or (maxtasks and completed < maxtasks):
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000058 try:
59 task = get()
60 except (EOFError, IOError):
61 debug('worker got EOFError or IOError -- exiting')
62 break
63
64 if task is None:
65 debug('worker got sentinel -- exiting')
66 break
67
68 job, i, func, args, kwds = task
69 try:
70 result = (True, func(*args, **kwds))
71 except Exception, e:
72 result = (False, e)
73 put((job, i, result))
Jesse Noller654ade32010-01-27 03:05:57 +000074 completed += 1
75 debug('worker exiting after %d tasks' % completed)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000076
77#
78# Class representing a process pool
79#
80
81class Pool(object):
82 '''
83 Class which supports an async version of the `apply()` builtin
84 '''
85 Process = Process
86
Jesse Noller654ade32010-01-27 03:05:57 +000087 def __init__(self, processes=None, initializer=None, initargs=(),
88 maxtasksperchild=None):
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000089 self._setup_queues()
90 self._taskqueue = Queue.Queue()
91 self._cache = {}
92 self._state = RUN
Jesse Noller654ade32010-01-27 03:05:57 +000093 self._maxtasksperchild = maxtasksperchild
94 self._initializer = initializer
95 self._initargs = initargs
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000096
97 if processes is None:
98 try:
99 processes = cpu_count()
100 except NotImplementedError:
101 processes = 1
102
Jesse Noller7152f6d2009-04-02 05:17:26 +0000103 if initializer is not None and not hasattr(initializer, '__call__'):
104 raise TypeError('initializer must be a callable')
105
Jesse Noller654ade32010-01-27 03:05:57 +0000106 self._processes = processes
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000107 self._pool = []
Jesse Noller654ade32010-01-27 03:05:57 +0000108 self._repopulate_pool()
109
110 self._worker_handler = threading.Thread(
111 target=Pool._handle_workers,
112 args=(self, )
113 )
114 self._worker_handler.daemon = True
115 self._worker_handler._state = RUN
116 self._worker_handler.start()
117
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000118
119 self._task_handler = threading.Thread(
120 target=Pool._handle_tasks,
121 args=(self._taskqueue, self._quick_put, self._outqueue, self._pool)
122 )
Benjamin Peterson82aa2012008-08-18 18:31:58 +0000123 self._task_handler.daemon = True
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000124 self._task_handler._state = RUN
125 self._task_handler.start()
126
127 self._result_handler = threading.Thread(
128 target=Pool._handle_results,
129 args=(self._outqueue, self._quick_get, self._cache)
130 )
Benjamin Peterson82aa2012008-08-18 18:31:58 +0000131 self._result_handler.daemon = True
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000132 self._result_handler._state = RUN
133 self._result_handler.start()
134
135 self._terminate = Finalize(
136 self, self._terminate_pool,
137 args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
Jesse Noller654ade32010-01-27 03:05:57 +0000138 self._worker_handler, self._task_handler,
139 self._result_handler, self._cache),
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000140 exitpriority=15
141 )
142
Jesse Noller654ade32010-01-27 03:05:57 +0000143 def _join_exited_workers(self):
144 """Cleanup after any worker processes which have exited due to reaching
145 their specified lifetime. Returns True if any workers were cleaned up.
146 """
147 cleaned = False
148 for i in reversed(range(len(self._pool))):
149 worker = self._pool[i]
150 if worker.exitcode is not None:
151 # worker exited
152 debug('cleaning up worker %d' % i)
153 worker.join()
154 cleaned = True
155 del self._pool[i]
156 return cleaned
157
158 def _repopulate_pool(self):
159 """Bring the number of pool processes up to the specified number,
160 for use after reaping workers which have exited.
161 """
162 for i in range(self._processes - len(self._pool)):
163 w = self.Process(target=worker,
164 args=(self._inqueue, self._outqueue,
165 self._initializer,
166 self._initargs, self._maxtasksperchild)
167 )
168 self._pool.append(w)
169 w.name = w.name.replace('Process', 'PoolWorker')
170 w.daemon = True
171 w.start()
172 debug('added worker')
173
174 def _maintain_pool(self):
175 """Clean up any exited workers and start replacements for them.
176 """
177 if self._join_exited_workers():
178 self._repopulate_pool()
179
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000180 def _setup_queues(self):
181 from .queues import SimpleQueue
182 self._inqueue = SimpleQueue()
183 self._outqueue = SimpleQueue()
184 self._quick_put = self._inqueue._writer.send
185 self._quick_get = self._outqueue._reader.recv
186
187 def apply(self, func, args=(), kwds={}):
188 '''
189 Equivalent of `apply()` builtin
190 '''
191 assert self._state == RUN
192 return self.apply_async(func, args, kwds).get()
193
194 def map(self, func, iterable, chunksize=None):
195 '''
196 Equivalent of `map()` builtin
197 '''
198 assert self._state == RUN
199 return self.map_async(func, iterable, chunksize).get()
200
201 def imap(self, func, iterable, chunksize=1):
202 '''
Georg Brandl5ecd7452008-11-22 08:45:33 +0000203 Equivalent of `itertools.imap()` -- can be MUCH slower than `Pool.map()`
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000204 '''
205 assert self._state == RUN
206 if chunksize == 1:
207 result = IMapIterator(self._cache)
208 self._taskqueue.put((((result._job, i, func, (x,), {})
209 for i, x in enumerate(iterable)), result._set_length))
210 return result
211 else:
212 assert chunksize > 1
213 task_batches = Pool._get_tasks(func, iterable, chunksize)
214 result = IMapIterator(self._cache)
215 self._taskqueue.put((((result._job, i, mapstar, (x,), {})
216 for i, x in enumerate(task_batches)), result._set_length))
217 return (item for chunk in result for item in chunk)
218
219 def imap_unordered(self, func, iterable, chunksize=1):
220 '''
221 Like `imap()` method but ordering of results is arbitrary
222 '''
223 assert self._state == RUN
224 if chunksize == 1:
225 result = IMapUnorderedIterator(self._cache)
226 self._taskqueue.put((((result._job, i, func, (x,), {})
227 for i, x in enumerate(iterable)), result._set_length))
228 return result
229 else:
230 assert chunksize > 1
231 task_batches = Pool._get_tasks(func, iterable, chunksize)
232 result = IMapUnorderedIterator(self._cache)
233 self._taskqueue.put((((result._job, i, mapstar, (x,), {})
234 for i, x in enumerate(task_batches)), result._set_length))
235 return (item for chunk in result for item in chunk)
236
237 def apply_async(self, func, args=(), kwds={}, callback=None):
238 '''
239 Asynchronous equivalent of `apply()` builtin
240 '''
241 assert self._state == RUN
242 result = ApplyResult(self._cache, callback)
243 self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
244 return result
245
246 def map_async(self, func, iterable, chunksize=None, callback=None):
247 '''
248 Asynchronous equivalent of `map()` builtin
249 '''
250 assert self._state == RUN
251 if not hasattr(iterable, '__len__'):
252 iterable = list(iterable)
253
254 if chunksize is None:
255 chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
256 if extra:
257 chunksize += 1
Jesse Noller7530e472009-07-16 14:23:04 +0000258 if len(iterable) == 0:
259 chunksize = 0
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000260
261 task_batches = Pool._get_tasks(func, iterable, chunksize)
262 result = MapResult(self._cache, chunksize, len(iterable), callback)
263 self._taskqueue.put((((result._job, i, mapstar, (x,), {})
264 for i, x in enumerate(task_batches)), None))
265 return result
266
267 @staticmethod
Jesse Noller654ade32010-01-27 03:05:57 +0000268 def _handle_workers(pool):
269 while pool._worker_handler._state == RUN and pool._state == RUN:
270 pool._maintain_pool()
271 time.sleep(0.1)
272 debug('worker handler exiting')
273
274 @staticmethod
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000275 def _handle_tasks(taskqueue, put, outqueue, pool):
276 thread = threading.current_thread()
277
278 for taskseq, set_length in iter(taskqueue.get, None):
279 i = -1
280 for i, task in enumerate(taskseq):
281 if thread._state:
282 debug('task handler found thread._state != RUN')
283 break
284 try:
285 put(task)
286 except IOError:
287 debug('could not put task on queue')
288 break
289 else:
290 if set_length:
291 debug('doing set_length()')
292 set_length(i+1)
293 continue
294 break
295 else:
296 debug('task handler got sentinel')
297
298
299 try:
300 # tell result handler to finish when cache is empty
301 debug('task handler sending sentinel to result handler')
302 outqueue.put(None)
303
304 # tell workers there is no more work
305 debug('task handler sending sentinel to workers')
306 for p in pool:
307 put(None)
308 except IOError:
309 debug('task handler got IOError when sending sentinels')
310
311 debug('task handler exiting')
312
313 @staticmethod
314 def _handle_results(outqueue, get, cache):
315 thread = threading.current_thread()
316
317 while 1:
318 try:
319 task = get()
320 except (IOError, EOFError):
321 debug('result handler got EOFError/IOError -- exiting')
322 return
323
324 if thread._state:
325 assert thread._state == TERMINATE
326 debug('result handler found thread._state=TERMINATE')
327 break
328
329 if task is None:
330 debug('result handler got sentinel')
331 break
332
333 job, i, obj = task
334 try:
335 cache[job]._set(i, obj)
336 except KeyError:
337 pass
338
339 while cache and thread._state != TERMINATE:
340 try:
341 task = get()
342 except (IOError, EOFError):
343 debug('result handler got EOFError/IOError -- exiting')
344 return
345
346 if task is None:
347 debug('result handler ignoring extra sentinel')
348 continue
349 job, i, obj = task
350 try:
351 cache[job]._set(i, obj)
352 except KeyError:
353 pass
354
355 if hasattr(outqueue, '_reader'):
356 debug('ensuring that outqueue is not full')
357 # If we don't make room available in outqueue then
358 # attempts to add the sentinel (None) to outqueue may
359 # block. There is guaranteed to be no more than 2 sentinels.
360 try:
361 for i in range(10):
362 if not outqueue._reader.poll():
363 break
364 get()
365 except (IOError, EOFError):
366 pass
367
368 debug('result handler exiting: len(cache)=%s, thread._state=%s',
369 len(cache), thread._state)
370
371 @staticmethod
372 def _get_tasks(func, it, size):
373 it = iter(it)
374 while 1:
375 x = tuple(itertools.islice(it, size))
376 if not x:
377 return
378 yield (func, x)
379
380 def __reduce__(self):
381 raise NotImplementedError(
382 'pool objects cannot be passed between processes or pickled'
383 )
384
385 def close(self):
386 debug('closing pool')
387 if self._state == RUN:
388 self._state = CLOSE
Jesse Noller654ade32010-01-27 03:05:57 +0000389 self._worker_handler._state = CLOSE
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000390 self._taskqueue.put(None)
391
392 def terminate(self):
393 debug('terminating pool')
394 self._state = TERMINATE
Jesse Noller654ade32010-01-27 03:05:57 +0000395 self._worker_handler._state = TERMINATE
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000396 self._terminate()
397
398 def join(self):
399 debug('joining pool')
400 assert self._state in (CLOSE, TERMINATE)
Jesse Noller654ade32010-01-27 03:05:57 +0000401 self._worker_handler.join()
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000402 self._task_handler.join()
403 self._result_handler.join()
404 for p in self._pool:
405 p.join()
406
407 @staticmethod
408 def _help_stuff_finish(inqueue, task_handler, size):
409 # task_handler may be blocked trying to put items on inqueue
410 debug('removing tasks from inqueue until task handler finished')
411 inqueue._rlock.acquire()
412 while task_handler.is_alive() and inqueue._reader.poll():
413 inqueue._reader.recv()
414 time.sleep(0)
415
416 @classmethod
417 def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
Jesse Noller654ade32010-01-27 03:05:57 +0000418 worker_handler, task_handler, result_handler, cache):
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000419 # this is guaranteed to only be called once
420 debug('finalizing pool')
421
Jesse Noller654ade32010-01-27 03:05:57 +0000422 worker_handler._state = TERMINATE
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000423 task_handler._state = TERMINATE
424 taskqueue.put(None) # sentinel
425
426 debug('helping task handler/workers to finish')
427 cls._help_stuff_finish(inqueue, task_handler, len(pool))
428
429 assert result_handler.is_alive() or len(cache) == 0
430
431 result_handler._state = TERMINATE
432 outqueue.put(None) # sentinel
433
Jesse Noller654ade32010-01-27 03:05:57 +0000434 # Terminate workers which haven't already finished.
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000435 if pool and hasattr(pool[0], 'terminate'):
436 debug('terminating workers')
437 for p in pool:
Jesse Noller654ade32010-01-27 03:05:57 +0000438 if p.exitcode is None:
439 p.terminate()
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000440
441 debug('joining task handler')
442 task_handler.join(1e100)
443
444 debug('joining result handler')
445 result_handler.join(1e100)
446
447 if pool and hasattr(pool[0], 'terminate'):
448 debug('joining pool workers')
449 for p in pool:
Florent Xiclunad034b322010-03-08 11:01:39 +0000450 if p.is_alive():
Jesse Noller654ade32010-01-27 03:05:57 +0000451 # worker has not yet exited
Florent Xiclunad034b322010-03-08 11:01:39 +0000452 debug('cleaning up worker %d' % p.pid)
453 p.join()
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000454
455#
456# Class whose instances are returned by `Pool.apply_async()`
457#
458
459class ApplyResult(object):
460
461 def __init__(self, cache, callback):
462 self._cond = threading.Condition(threading.Lock())
463 self._job = job_counter.next()
464 self._cache = cache
465 self._ready = False
466 self._callback = callback
467 cache[self._job] = self
468
469 def ready(self):
470 return self._ready
471
472 def successful(self):
473 assert self._ready
474 return self._success
475
476 def wait(self, timeout=None):
477 self._cond.acquire()
478 try:
479 if not self._ready:
480 self._cond.wait(timeout)
481 finally:
482 self._cond.release()
483
484 def get(self, timeout=None):
485 self.wait(timeout)
486 if not self._ready:
487 raise TimeoutError
488 if self._success:
489 return self._value
490 else:
491 raise self._value
492
493 def _set(self, i, obj):
494 self._success, self._value = obj
495 if self._callback and self._success:
496 self._callback(self._value)
497 self._cond.acquire()
498 try:
499 self._ready = True
500 self._cond.notify()
501 finally:
502 self._cond.release()
503 del self._cache[self._job]
504
505#
506# Class whose instances are returned by `Pool.map_async()`
507#
508
509class MapResult(ApplyResult):
510
511 def __init__(self, cache, chunksize, length, callback):
512 ApplyResult.__init__(self, cache, callback)
513 self._success = True
514 self._value = [None] * length
515 self._chunksize = chunksize
516 if chunksize <= 0:
517 self._number_left = 0
518 self._ready = True
519 else:
520 self._number_left = length//chunksize + bool(length % chunksize)
521
522 def _set(self, i, success_result):
523 success, result = success_result
524 if success:
525 self._value[i*self._chunksize:(i+1)*self._chunksize] = result
526 self._number_left -= 1
527 if self._number_left == 0:
528 if self._callback:
529 self._callback(self._value)
530 del self._cache[self._job]
531 self._cond.acquire()
532 try:
533 self._ready = True
534 self._cond.notify()
535 finally:
536 self._cond.release()
537
538 else:
539 self._success = False
540 self._value = result
541 del self._cache[self._job]
542 self._cond.acquire()
543 try:
544 self._ready = True
545 self._cond.notify()
546 finally:
547 self._cond.release()
548
549#
550# Class whose instances are returned by `Pool.imap()`
551#
552
553class IMapIterator(object):
554
555 def __init__(self, cache):
556 self._cond = threading.Condition(threading.Lock())
557 self._job = job_counter.next()
558 self._cache = cache
559 self._items = collections.deque()
560 self._index = 0
561 self._length = None
562 self._unsorted = {}
563 cache[self._job] = self
564
565 def __iter__(self):
566 return self
567
568 def next(self, timeout=None):
569 self._cond.acquire()
570 try:
571 try:
572 item = self._items.popleft()
573 except IndexError:
574 if self._index == self._length:
575 raise StopIteration
576 self._cond.wait(timeout)
577 try:
578 item = self._items.popleft()
579 except IndexError:
580 if self._index == self._length:
581 raise StopIteration
582 raise TimeoutError
583 finally:
584 self._cond.release()
585
586 success, value = item
587 if success:
588 return value
589 raise value
590
591 __next__ = next # XXX
592
593 def _set(self, i, obj):
594 self._cond.acquire()
595 try:
596 if self._index == i:
597 self._items.append(obj)
598 self._index += 1
599 while self._index in self._unsorted:
600 obj = self._unsorted.pop(self._index)
601 self._items.append(obj)
602 self._index += 1
603 self._cond.notify()
604 else:
605 self._unsorted[i] = obj
606
607 if self._index == self._length:
608 del self._cache[self._job]
609 finally:
610 self._cond.release()
611
612 def _set_length(self, length):
613 self._cond.acquire()
614 try:
615 self._length = length
616 if self._index == self._length:
617 self._cond.notify()
618 del self._cache[self._job]
619 finally:
620 self._cond.release()
621
622#
623# Class whose instances are returned by `Pool.imap_unordered()`
624#
625
626class IMapUnorderedIterator(IMapIterator):
627
628 def _set(self, i, obj):
629 self._cond.acquire()
630 try:
631 self._items.append(obj)
632 self._index += 1
633 self._cond.notify()
634 if self._index == self._length:
635 del self._cache[self._job]
636 finally:
637 self._cond.release()
638
639#
640#
641#
642
643class ThreadPool(Pool):
644
645 from .dummy import Process
646
647 def __init__(self, processes=None, initializer=None, initargs=()):
648 Pool.__init__(self, processes, initializer, initargs)
649
650 def _setup_queues(self):
651 self._inqueue = Queue.Queue()
652 self._outqueue = Queue.Queue()
653 self._quick_put = self._inqueue.put
654 self._quick_get = self._outqueue.get
655
656 @staticmethod
657 def _help_stuff_finish(inqueue, task_handler, size):
658 # put sentinels at head of inqueue to make workers finish
659 inqueue.not_empty.acquire()
660 try:
661 inqueue.queue.clear()
662 inqueue.queue.extend([None] * size)
663 inqueue.not_empty.notify_all()
664 finally:
665 inqueue.not_empty.release()