Merge the test stage and the build stage.

Use the pipeline_worker to represent both stage. These two stages differ on
getting the key of the task in the stage and setting the value of this stage.
The key uniquely distinguish a task from another task. For example, the key
for the build phase is the optimization flags, while the key for the test stage
is the checksum of the built image.

BUG=None
TEST=None

Change-Id: Id057c17026806acd406fe78cc70ed996273ca9aa
Reviewed-on: https://gerrit-int.chromium.org/40376
Reviewed-by: Simon Que <sque@google.com>
Reviewed-by: Luis Lozano <llozano@chromium.org>
Commit-Queue: Yuheng Long <yuhenglong@google.com>
Tested-by: Yuheng Long <yuhenglong@google.com>
diff --git a/bestflags/pipeline_worker_test.py b/bestflags/pipeline_worker_test.py
new file mode 100644
index 0000000..340b301
--- /dev/null
+++ b/bestflags/pipeline_worker_test.py
@@ -0,0 +1,173 @@
+"""Unittest for the pipeline_worker functions in the build/test stage.
+
+This module tests the helper method and the worker method.
+"""
+
+__author__ = 'yuhenglong@google.com (Yuheng Long)'
+
+import multiprocessing
+import random
+import sys
+import unittest
+
+import pipeline_process
+import pipeline_worker
+
+
+TESTSTAGE = 0
+
+
+def MockTaskCostGenerator():
+  """Calls a random number generator and returns a negative number."""
+  return random.randint(-sys.maxint - 1, -1)
+
+
+class MockTask(object):
+  """This class emulates an actual task.
+
+  It does not do the actual work, but simply returns the result as given when
+  this task is constructed.
+  """
+
+  def __init__(self, identifier, cost):
+    """Set up the results for this task.
+
+    Args:
+      identifier: the identifier of this task.
+      cost: the mock cost of this task.
+
+      The _pre_cost field stores the cost. Once this task is performed, i.e., by
+      calling the work method , the _cost field will have this cost.
+    """
+
+    self._identifier = identifier
+    self._pre_cost = cost
+
+  def get_identifier(self, stage):
+    assert stage == TESTSTAGE
+    return self._identifier
+
+  def __eq__(self, other):
+    if isinstance(other, MockTask):
+      return self._identifier == other._identifier and self._cost == other._cost
+    return False
+
+  def set_result(self, stage, cost):
+    assert stage == TESTSTAGE
+    self._cost = cost
+
+  def work(self, stage):
+    assert stage == TESTSTAGE
+    self._cost = self._pre_cost
+
+  def get_result(self, stage):
+    assert stage == TESTSTAGE
+    return self._cost
+
+  def done(self, stage):
+    """Indicates whether the task has been performed."""
+
+    assert stage == TESTSTAGE
+    return '_cost' in self.__dict__
+
+
+class AuxiliaryTest(unittest.TestCase):
+  """This class tests the pipeline_worker functions.
+
+  Given the same identifier, the cost should result the same from the
+  pipeline_worker functions.
+  """
+
+  def testHelper(self):
+    """"Test the helper.
+
+    Call the helper method twice, and test the results. The results should be
+    the same, i.e., the cost should be the same.
+    """
+
+    # Set up the input, helper and output queue for the helper method.
+    manager = multiprocessing.Manager()
+    helper_queue = manager.Queue()
+    result_queue = manager.Queue()
+    completed_queue = manager.Queue()
+
+    # Set up the helper process that holds the helper method.
+    helper_process = multiprocessing.Process(target=pipeline_worker.helper,
+                                             args=(TESTSTAGE, {}, helper_queue,
+                                                   completed_queue,
+                                                   result_queue))
+    helper_process.start()
+
+    # A dictionary defines the mock result to the helper.
+    mock_result = {1: 1995, 2: 59, 9: 1027}
+
+    # Test if there is a task that is done before, whether the duplicate task
+    # will have the same result. Here, two different scenarios are tested. That
+    # is the mock results are added to the completed_queue before and after the
+    # corresponding mock tasks being added to the input queue.
+    completed_queue.put((9, mock_result[9]))
+
+    # The output of the helper should contain all the following tasks.
+    results = [1, 1, 2, 9]
+
+    # Testing the correctness of having tasks having the same identifier, here
+    # 1.
+    for result in results:
+      helper_queue.put(MockTask(result, MockTaskCostGenerator()))
+
+    completed_queue.put((2, mock_result[2]))
+    completed_queue.put((1, mock_result[1]))
+
+    # Signal there is no more duplicate task.
+    helper_queue.put(pipeline_process.POISONPILL)
+    helper_process.join()
+
+    while results:
+      task = result_queue.get()
+      identifier = task._identifier
+      cost = task._cost
+      self.assertTrue(identifier in results)
+      if identifier in mock_result:
+        self.assertTrue(cost, mock_result[identifier])
+      results.remove(task._identifier)
+
+  def testWorker(self):
+    """"Test the worker method.
+
+    The worker should process all the input tasks and output the tasks to the
+    helper and result queue.
+    """
+
+    manager = multiprocessing.Manager()
+    result_queue = manager.Queue()
+    completed_queue = manager.Queue()
+
+    # A dictionary defines the mock tasks and their corresponding results.
+    mock_work_tasks = {1: 86, 2: 788}
+
+    mock_tasks = []
+
+    for flag, cost in mock_work_tasks.iteritems():
+      mock_tasks.append(MockTask(flag, cost))
+
+    # Submit the mock tasks to the worker.
+    for mock_task in mock_tasks:
+      pipeline_worker.worker(TESTSTAGE, mock_task, completed_queue,
+                             result_queue)
+
+    # The tasks, from the output queue, should be the same as the input and
+    # should be performed.
+    for task in mock_tasks:
+      output = result_queue.get()
+      self.assertEqual(output, task)
+      self.assertTrue(output.done(TESTSTAGE))
+
+    # The tasks, from the completed_queue, should be defined in the
+    # mock_work_tasks dictionary.
+    for flag, cost in mock_work_tasks.iteritems():
+      helper_input = completed_queue.get()
+      self.assertEqual(helper_input, (flag, cost))
+
+
+if __name__ == '__main__':
+  unittest.main()