mbligh | be630eb | 2008-08-01 16:41:48 +0000 | [diff] [blame] | 1 | # |
mbligh | 1ef218d | 2009-08-03 16:57:56 +0000 | [diff] [blame] | 2 | # Copyright 2008 Google Inc. |
mbligh | d909feb | 2009-06-15 21:24:42 +0000 | [diff] [blame] | 3 | # Released under the GPLv2 |
mbligh | be630eb | 2008-08-01 16:41:48 +0000 | [diff] [blame] | 4 | |
| 5 | import threading, Queue |
| 6 | |
| 7 | class ThreadPool: |
| 8 | """ A generic threading class for use in the CLI |
| 9 | ThreadPool class takes the function to be executed as an argument and |
| 10 | optionally number of threads. It then creates multiple threads for |
| 11 | faster execution. """ |
| 12 | |
| 13 | def __init__(self, function, numthreads=40): |
| 14 | assert(numthreads > 0) |
| 15 | self.threads = Queue.Queue(0) |
| 16 | self.function = function |
| 17 | self.numthreads = 0 |
| 18 | self.queue = Queue.Queue(0) |
| 19 | self._start_threads(numthreads) |
| 20 | |
| 21 | |
| 22 | def wait(self): |
| 23 | """ Checks to see if any threads are still working and |
| 24 | blocks until worker threads all complete. """ |
| 25 | for x in xrange(self.numthreads): |
| 26 | self.queue.put('die') |
| 27 | # As only spawned threads are allowed to add new ones, |
| 28 | # we can safely wait for the thread queue to be empty |
| 29 | # (if we're at the last thread and it creates a new one, |
| 30 | # it will get queued before it finishes). |
| 31 | dead = 0 |
| 32 | while True: |
| 33 | try: |
| 34 | thread = self.threads.get(block=True, timeout=1) |
| 35 | if thread.isAlive(): |
| 36 | thread.join() |
| 37 | dead += 1 |
| 38 | except Queue.Empty: |
| 39 | assert(dead == self.numthreads) |
| 40 | return |
| 41 | |
| 42 | |
| 43 | def queue_work(self, data): |
| 44 | """ Takes a list of items and appends them to the |
| 45 | work queue. """ |
| 46 | [self.queue.put(item) for item in data] |
| 47 | |
| 48 | |
| 49 | def add_one_thread_post_wait(self): |
| 50 | # Only a spawned thread (not the main one) |
| 51 | # should call this (see wait() for details) |
| 52 | self._start_threads(1) |
| 53 | self.queue.put('die') |
| 54 | |
| 55 | |
| 56 | def _start_threads(self, nthreads): |
| 57 | """ Start up threads to spawn workers. """ |
| 58 | self.numthreads += nthreads |
mbligh | cd26d04 | 2010-05-03 18:58:24 +0000 | [diff] [blame] | 59 | for i in xrange(nthreads): |
mbligh | be630eb | 2008-08-01 16:41:48 +0000 | [diff] [blame] | 60 | thread = threading.Thread(target=self._new_worker) |
| 61 | thread.setDaemon(True) |
| 62 | self.threads.put(thread) |
| 63 | thread.start() |
| 64 | |
| 65 | |
| 66 | def _new_worker(self): |
| 67 | """ Spawned worker threads. These threads loop until queue is empty.""" |
| 68 | while True: |
| 69 | # Blocking call |
| 70 | data = self.queue.get() |
| 71 | if data == 'die': |
| 72 | return |
mbligh | d84f9dc | 2009-02-03 02:04:46 +0000 | [diff] [blame] | 73 | try: |
| 74 | self.function(data) |
mbligh | cd26d04 | 2010-05-03 18:58:24 +0000 | [diff] [blame] | 75 | except Exception, full_error: |
| 76 | # Put a catch all here. |
| 77 | print ('Unexpected failure in the thread calling %s: %s' % |
| 78 | (self.function.__name__, full_error)) |