Have the pipeline process working. Added the unit test for this class.

BUG=None
TEST=None

Change-Id: I7fe9fd5b1610959399000b1dfc9b6db55c5c28fb
Reviewed-on: https://gerrit-int.chromium.org/39473
Reviewed-by: Luis Lozano <llozano@chromium.org>
Tested-by: Yuheng Long <yuhenglong@google.com>
Commit-Queue: Yuheng Long <yuhenglong@google.com>
diff --git a/bestflags/pipeline_process.py b/bestflags/pipeline_process.py
index 6b878b3..21d3c58 100644
--- a/bestflags/pipeline_process.py
+++ b/bestflags/pipeline_process.py
@@ -1,37 +1,76 @@
 """Pipeline process that encapsulates the actual content.
 
-The actual stages include the Steering algorithm, the builder and the executor.
+The actual stages include the builder and the executor.
 """
 
 __author__ = 'yuhenglong@google.com (Yuheng Long)'
 
 import multiprocessing
 
+# Pick an integer at random.
+POISONPILL = 975
+
 
 class PipelineProcess(multiprocessing.Process):
-  """A process that encapsulates the actual content.
+  """A process that encapsulates the actual content pipeline stage.
 
-  It continuously pull tasks from the queue until a poison pill is received.
+  The actual pipeline stage can be the builder or the tester.  This process
+  continuously pull tasks from the queue until a poison pill is received.
   Once a job is received, it will hand it to the actual stage for processing.
+
+  Each pipeline stage contains three modules.
+  The first module continuously pulls task from the input queue. It searches the
+  cache to check whether the task has encountered before. If so, duplicate
+  computation can be avoided.
+  The second module consists of a pool of workers that do the actual work, e.g.,
+  the worker will compile the source code and get the image in the builder
+  pipeline stage.
+  The third module is a helper that put the result cost to the cost field of the
+  duplicate tasks. For example, if two tasks are equivalent, only one task, say
+  t1 will be executed and the other task, say t2 will not be executed. The third
+  mode gets the result from t1, when it is available and set the cost of t2 to
+  be the same as that of t1.
   """
 
-  # Poison pill means shutdown
-  POISON_PILL = None
-
-  def __init__(self, method, task_queue, result_queue):
+  def __init__(self, num_processes, name, cache, stage, task_queue, helper,
+               worker, result_queue):
     """Set up input/output queue and the actual method to be called.
 
     Args:
-      method: The actual pipeline stage to be invoked.
+      num_processes: Number of helpers subprocessors this stage has.
+      name: The name of this stage.
+      cache: The computed tasks encountered before.
+      stage: An int value that specifies the stage for this pipeline stage, for
+        example, build stage or test stage. This value will be used to retrieve
+        the keys in different stage. I.e., the flags set is the key in build
+        stage and the checksum is the key in the test stage. The key is used to
+        detect duplicates.
       task_queue: The input task queue for this pipeline stage.
+      helper: The method hosted by the helper module to fill up the cost of the
+        duplicate tasks.
+      worker: The method hosted by the worker pools to do the actual work, e.g.,
+        compile the image.
       result_queue: The output task queue for this pipeline stage.
     """
 
     multiprocessing.Process.__init__(self)
-    self._method = method
+
+    self._name = name
     self._task_queue = task_queue
     self._result_queue = result_queue
 
+    self._helper = helper
+    self._worker = worker
+
+    self._cache = cache
+    self._stage = stage
+    self._num_processes = num_processes
+
+    # the queues used by the modules for communication
+    manager = multiprocessing.Manager()
+    self._helper_queue = manager.Queue()
+    self._work_queue = manager.Queue()
+
   def run(self):
     """Busy pulling the next task from the queue for execution.
 
@@ -41,11 +80,39 @@
     The process will terminate on receiving the poison pill from previous stage.
     """
 
+    # the worker pool
+    self._pool = multiprocessing.Pool(self._num_processes)
+
+    # the helper process
+    helper_process = multiprocessing.Process(target=self._helper,
+                                             args=(self._cache,
+                                                   self._helper_queue,
+                                                   self._work_queue,
+                                                   self._result_queue))
+    helper_process.start()
+    mycache = self._cache.keys()
+
     while True:
-      next_task = self.task_queue.get()
-      if next_task is None:
+      task = self._task_queue.get()
+      if task == POISONPILL:
         # Poison pill means shutdown
-        self.result_queue.put(None)
+        self._result_queue.put(POISONPILL)
         break
-      self._method(next_task)
-      self.result_queue.put(next_task)
+
+      task_key = task.get_key(self._stage)
+      if task_key in mycache:
+        # The task has been encountered before. It will be sent to the helper
+        # module for further processing.
+        self._helper_queue.put(task)
+      else:
+        # Let the workers do the actual work.
+        self._pool.apply_async(self._worker, args=(task, self._work_queue,
+                                                   self._result_queue))
+        mycache.append(task_key)
+
+    # Shutdown the workers pool and the helper process.
+    self._pool.close()
+    self._pool.join()
+
+    self._helper_queue.put(POISONPILL)
+    helper_process.join()