blob: eabefcaeb8f326e73dd947a1f6195348c6429520 [file] [log] [blame]
mblighbe630eb2008-08-01 16:41:48 +00001#
mbligh1ef218d2009-08-03 16:57:56 +00002# Copyright 2008 Google Inc.
mblighd909feb2009-06-15 21:24:42 +00003# Released under the GPLv2
mblighbe630eb2008-08-01 16:41:48 +00004
5import threading, Queue
6
7class ThreadPool:
8 """ A generic threading class for use in the CLI
9 ThreadPool class takes the function to be executed as an argument and
10 optionally number of threads. It then creates multiple threads for
11 faster execution. """
12
13 def __init__(self, function, numthreads=40):
14 assert(numthreads > 0)
15 self.threads = Queue.Queue(0)
16 self.function = function
17 self.numthreads = 0
18 self.queue = Queue.Queue(0)
19 self._start_threads(numthreads)
20
21
22 def wait(self):
23 """ Checks to see if any threads are still working and
24 blocks until worker threads all complete. """
25 for x in xrange(self.numthreads):
26 self.queue.put('die')
27 # As only spawned threads are allowed to add new ones,
28 # we can safely wait for the thread queue to be empty
29 # (if we're at the last thread and it creates a new one,
30 # it will get queued before it finishes).
31 dead = 0
32 while True:
33 try:
34 thread = self.threads.get(block=True, timeout=1)
35 if thread.isAlive():
36 thread.join()
37 dead += 1
38 except Queue.Empty:
39 assert(dead == self.numthreads)
40 return
41
42
43 def queue_work(self, data):
44 """ Takes a list of items and appends them to the
45 work queue. """
46 [self.queue.put(item) for item in data]
47
48
49 def add_one_thread_post_wait(self):
50 # Only a spawned thread (not the main one)
51 # should call this (see wait() for details)
52 self._start_threads(1)
53 self.queue.put('die')
54
55
56 def _start_threads(self, nthreads):
57 """ Start up threads to spawn workers. """
58 self.numthreads += nthreads
mblighcd26d042010-05-03 18:58:24 +000059 for i in xrange(nthreads):
mblighbe630eb2008-08-01 16:41:48 +000060 thread = threading.Thread(target=self._new_worker)
61 thread.setDaemon(True)
62 self.threads.put(thread)
63 thread.start()
64
65
66 def _new_worker(self):
67 """ Spawned worker threads. These threads loop until queue is empty."""
68 while True:
69 # Blocking call
70 data = self.queue.get()
71 if data == 'die':
72 return
mblighd84f9dc2009-02-03 02:04:46 +000073 try:
74 self.function(data)
mblighcd26d042010-05-03 18:58:24 +000075 except Exception, full_error:
76 # Put a catch all here.
77 print ('Unexpected failure in the thread calling %s: %s' %
78 (self.function.__name__, full_error))