Have the pipeline process working. Added the unit test for this class.

BUG=None
TEST=None

Change-Id: I7fe9fd5b1610959399000b1dfc9b6db55c5c28fb
Reviewed-on: https://gerrit-int.chromium.org/39473
Reviewed-by: Luis Lozano <llozano@chromium.org>
Tested-by: Yuheng Long <yuhenglong@google.com>
Commit-Queue: Yuheng Long <yuhenglong@google.com>
diff --git a/bestflags/pipeline_process_test.py b/bestflags/pipeline_process_test.py
index 8df2327..e40b5a2 100644
--- a/bestflags/pipeline_process_test.py
+++ b/bestflags/pipeline_process_test.py
@@ -2,10 +2,42 @@
 
 __author__ = 'yuhenglong@google.com (Yuheng Long)'
 
+import multiprocessing
 import unittest
 
 import pipeline_process
 
+# Pick an integer at random.
+ERROR = -334
+
+
+def MockHelper(done_dict, helper_queue, work_queue, result_queue):
+  """This method echos input to the output."""
+  while True:
+    if not helper_queue.empty():
+      task = helper_queue.get()
+      if task == pipeline_process.POISONPILL:
+        # Poison pill means shutdown
+        break
+
+      if task in done_dict:
+        # verify that it does not get duplicate "1"s in the test.
+        result_queue.put(ERROR)
+      else:
+        result_queue.put(('helper', task.get_key(0)))
+
+
+def MockWorker(task, buffer_queue, result_queue):
+  result_queue.put(('worker', task.get_key(0)))
+
+
+class MockTask(object):
+  def __init__(self, key):
+    self._key = key
+
+  def get_key(self, stage):
+    return self._key
+
 
 class PipelineProcessTest(unittest.TestCase):
   """This class test the PipelineProcess.
@@ -19,11 +51,38 @@
     pass
 
   def testRun(self):
-    """"Test the run method.
+    """Test the run method.
 
     Ensure that all the tasks inserted into the queue are properly handled.
     """
-    pass
+
+    manager = multiprocessing.Manager()
+    inp = manager.Queue()
+    output = manager.Queue()
+
+    process = pipeline_process.PipelineProcess(2, 'testing', {}, 'test', inp,
+                                               MockHelper, MockWorker, output)
+
+    process.start()
+    inp.put(MockTask(1))
+    inp.put(MockTask(1))
+    inp.put(MockTask(2))
+    inp.put(pipeline_process.POISONPILL)
+    process.join()
+
+    # All tasks are processed once and only once.
+    result = [('worker', 1), ('helper', 1), ('worker', 2),
+              pipeline_process.POISONPILL]
+    while result:
+      task = output.get()
+
+      # One "1"s is passed to the worker and one to the helper.
+      self.assertNotEqual(task, ERROR)
+
+      # The messages received should be exactly the same as the result.
+      self.assertTrue(task in result)
+      result.remove(task)
+
 
 if __name__ == '__main__':
   unittest.main()