Add the steering stage of the framework.

BUG=None
TEST=unit testings for the pipeline stage, pipeline workers, generation and
steering.

Change-Id: Id92bcf04ee24dfbc918f59ac8d87d30ee69e47b3
Reviewed-on: https://gerrit-int.chromium.org/41454
Reviewed-by: Simon Que <sque@google.com>
Reviewed-by: Luis Lozano <llozano@chromium.org>
Commit-Queue: Yuheng Long <yuhenglong@google.com>
Tested-by: Yuheng Long <yuhenglong@google.com>
diff --git a/bestflags/generation.py b/bestflags/generation.py
index f1a6363..7f0e94a 100644
--- a/bestflags/generation.py
+++ b/bestflags/generation.py
@@ -85,19 +85,26 @@
 
     Args:
       task: A task that has its results ready.
+
+    Returns:
+      Whether the input task belongs to this generation.
     """
 
-    # If there is a match.
-    if task in self._exe_pool:
-      # Remove the place holder task in this generation and store the new input
-      # task and its result.
-      self._exe_pool.remove(task)
-      self._exe_pool.add(task)
+    # If there is a match, the input task belongs to this generation.
+    if task not in self._exe_pool:
+      return False
 
-      # The current generation will have one less task to wait on.
-      self._pending -= 1
+    # Remove the place holder task in this generation and store the new input
+    # task and its result.
+    self._exe_pool.remove(task)
+    self._exe_pool.add(task)
 
-      assert self._pending >= 0
+    # The current generation will have one less task to wait on.
+    self._pending -= 1
+
+    assert self._pending >= 0
+
+    return True
 
   def Improve(self):
     """True if this generation has improvement over its parent generation.
diff --git a/bestflags/generation_test.py b/bestflags/generation_test.py
index 748a988..1249f7d 100644
--- a/bestflags/generation_test.py
+++ b/bestflags/generation_test.py
@@ -13,7 +13,7 @@
 import unittest
 
 from generation import Generation
-from mock_task import MockTask
+from mock_task import IdentifierMockTask
 
 
 # Pick an integer at random.
@@ -27,24 +27,6 @@
 STRIDE = 7
 
 
-class GenerationMockTask(MockTask):
-  """This class defines the mock task to test the Generation class.
-
-  The task instances will be inserted into a set. Therefore the hash and the
-  equal methods are overridden. The generation class considers the identifier to
-  set the cost of the task in a set, thus the identifier is used in the
-  overriding methods.
-  """
-
-  def __hash__(self):
-    return self._identifier
-
-  def __eq__(self, other):
-    if isinstance(other, MockTask):
-      return self._identifier == other.GetIdentifier(self._stage)
-    return False
-
-
 class GenerationTest(unittest.TestCase):
   """This class test the Generation class.
 
@@ -66,7 +48,7 @@
     testing_tasks = range(NUMTASKS)
 
     # The tasks for the generation to be tested.
-    generation_tasks = [GenerationMockTask(TESTSTAGE, t) for t in testing_tasks]
+    generation_tasks = [IdentifierMockTask(TESTSTAGE, t) for t in testing_tasks]
 
     gen = Generation(set(generation_tasks), None)
 
@@ -81,7 +63,7 @@
 
       # Mark a task as done by calling the UpdateTask method of the generation.
       # Send the generation the task as well as its results.
-      gen.UpdateTask(GenerationMockTask(TESTSTAGE, testing_task))
+      gen.UpdateTask(IdentifierMockTask(TESTSTAGE, testing_task))
 
     # The Done method should return true after all the tasks in the permuted
     # list is set.
diff --git a/bestflags/mock_task.py b/bestflags/mock_task.py
index 059536c..d2c3726 100644
--- a/bestflags/mock_task.py
+++ b/bestflags/mock_task.py
@@ -2,7 +2,7 @@
 # Use of this source code is governed by a BSD-style license that can be
 # found in the LICENSE file.
 
-"""This module defines the common mock task used by varies unit tests.
+"""This module defines the common mock tasks used by various unit tests.
 
 Part of the Chrome build flags optimization.
 """
@@ -66,3 +66,23 @@
 
     assert stage == self._stage
     return '_cost' in self.__dict__
+
+  def CacheSteeringCost(self):
+    pass
+
+
+class IdentifierMockTask(MockTask):
+  """This class defines the mock task that does not consider the cost.
+
+  The task instances will be inserted into a set. Therefore the hash and the
+  equal methods are overridden. The unittests that compares identities of the
+  tasks for equality can use this mock task instead of the base mock tack.
+  """
+
+  def __hash__(self):
+    return self._identifier
+
+  def __eq__(self, other):
+    if isinstance(other, MockTask):
+      return self._identifier == other.GetIdentifier(self._stage)
+    return False
diff --git a/bestflags/steering.py b/bestflags/steering.py
index ce718a4..09c7838 100644
--- a/bestflags/steering.py
+++ b/bestflags/steering.py
@@ -2,31 +2,115 @@
 # Use of this source code is governed by a BSD-style license that can be
 # found in the LICENSE file.
 
-"""A Genetic Algorithm implementation for selecting good flags.
+"""The framework stage that produces the next generation of tasks to run.
 
 Part of the Chrome build flags optimization.
 """
 
 __author__ = 'yuhenglong@google.com (Yuheng Long)'
 
+import pipeline_process
 
-class Steering(object):
-  """The steering algorithm that produce the next generation to be run."""
 
-  def __init__(self, steps):
-    """Set up the number of steps generations this algorithm should evolve.
+def Steering(cache, generations, input_queue, result_queue):
+  """The core method template that produces the next generation of tasks to run.
 
-    Args:
-      steps: number of steps that the feed back loop should perform
-    """
+  This method waits for the results of the tasks from the previous generation.
+  Upon the arrival of all these results, the method uses them to generate the
+  next generation of tasks.
 
-    self._steps = steps
+  The main logic of producing the next generation from previous generation is
+  application specific. For example, in the genetic algorithm, a task is
+  produced by combining two parents tasks, while in the hill climbing algorithm,
+  a task is generated by its immediate neighbor. The method 'Next' is overridden
+  in the concrete subclasses of the class Generation to produce the next
+  application-specific generation. The steering method invokes the 'Next'
+  method, produces the next generation and submits the tasks in this generation
+  to the next stage, e.g., the build/compilation stage.
 
-  def Run(self, generation):
-    """Generate a set of new generations for the next round of execution.
+  Args:
+    cache: It stores the experiments that have been conducted before. Used to
+      avoid duplicate works.
+    generations: The initial generations of tasks to be run.
+    input_queue: The input results from the last stage of the framework. These
+      results will trigger new iteration of the algorithm.
+    result_queue: The output task queue for this pipeline stage. The new tasks
+      generated by the steering algorithm will be sent to the next stage via
+      this queue.
+  """
 
-    Args:
-      generation: the previous generation.
-    """
+  # Generations that have pending tasks to be executed. Pending tasks are those
+  # whose results are not ready. The tasks that have their results ready are
+  # referenced to as ready tasks. Once there is no pending generation, the
+  # algorithm terminates.
+  waiting = generations
 
-    pass
+  # Record how many initial tasks there are. If there is no task at all, the
+  # algorithm can terminate right away.
+  num_tasks = 0
+
+  # Submit all the tasks in the initial generations to the next stage of the
+  # framework. The next stage can be the build/compilation stage.
+  for generation in generations:
+    # Only send the task that has not been performed before to the next stage.
+    for task in [task for task in generation.Pool() if task not in cache]:
+      result_queue.put(task)
+      cache.add(task)
+      num_tasks += 1
+
+  # If there is no task to be executed at all, the algorithm returns right away.
+  if not num_tasks:
+    # Inform the next stage that there will be no more task.
+    result_queue.put(pipeline_process.POISONPILL)
+    return
+
+  # The algorithm is done if there is no pending generation. A generation is
+  # pending if it has pending task.
+  while waiting:
+    # Busy-waiting for the next task.
+    if input_queue.empty():
+      continue
+
+    # If there is a task whose result is ready from the last stage of the
+    # feedback loop, there will be one less pending task.
+    task = input_queue.get()
+
+    # Store the result of this ready task. Intermediate results can be used to
+    # generate report for final result or be used to reboot from a crash from
+    # the failure of any module of the framework.
+    task.CacheSteeringCost()
+
+    # Find out which pending generation this ready task belongs to. This pending
+    # generation will have one less pending task. The "next" expression iterates
+    # the generations in waiting until the first generation whose UpdateTask
+    # method returns true.
+    generation = next(gen for gen in waiting if gen.UpdateTask(task))
+
+    # If there is still any pending task, do nothing.
+    if not generation.Done():
+      continue
+
+    # All the tasks in the generation are finished. The generation is ready to
+    # produce the next generation.
+    waiting.remove(generation)
+
+    # Check whether a generation should generate the next generation.
+    # A generation may not generate the next generation, e.g., because a
+    # fixpoint has been reached, there has not been any improvement for a few
+    # generations or a local maxima is reached.
+    if not generation.Improve():
+      continue
+
+    for new_generation in generation.Next(cache):
+      # Make sure that each generation should contain at least one task.
+      assert new_generation.Pool()
+      waiting.append(new_generation)
+
+      # Send the tasks of the new generations to the next stage for execution.
+      for new_task in new_generation.Pool():
+        result_queue.put(new_task)
+        cache.add(new_task)
+
+  # Steering algorithm is finished and it informs the next stage that there will
+  # be no more task.
+  result_queue.put(pipeline_process.POISONPILL)
diff --git a/bestflags/steering_test.py b/bestflags/steering_test.py
index 828d226..b3ce686 100644
--- a/bestflags/steering_test.py
+++ b/bestflags/steering_test.py
@@ -9,28 +9,165 @@
 
 __author__ = 'yuhenglong@google.com (Yuheng Long)'
 
+import multiprocessing
 import unittest
 
+from generation import Generation
+from mock_task import IdentifierMockTask
+import pipeline_process
 import steering
 
 
-class SteeringTest(unittest.TestCase):
-  """This class test the Steering class.
+# Pick an integer at random.
+STEERING_TEST_STAGE = -8
 
-  This steering algorithm should stop either it has generated a certain number
-  of generations or the generation has no further improvement.
+# The number of generations to be used to do the testing.
+NUMBER_OF_GENERATIONS = 20
+
+# The number of tasks to be put in each generation to be tested.
+NUMBER_OF_TASKS = 20
+
+# The stride of permutation used to shuffle the input list of tasks. Should be
+# relatively prime with NUMBER_OF_TASKS.
+STRIDE = 7
+
+
+class MockGeneration(Generation):
+  """This class emulates an actual generation.
+
+  It will output the next_generations when the method Next is called. The
+  next_generations is initiated when the MockGeneration instance is constructed.
   """
 
-  def setUp(self):
-    pass
+  def __init__(self, tasks, next_generations):
+    """Set up the next generations for this task.
 
-  def testGeneration(self):
-    """"Test proper termination for a number of generations."""
-    pass
+    Args:
+      tasks: A set of tasks to be run.
+      next_generations: A list of generations as the next generation of the
+        current generation.
+    """
+    Generation.__init__(self, tasks, None)
+    self._next_generations = next_generations
 
-  def testImprove(self):
-    """"Test proper termination for no improvement between generations."""
-    pass
+  def Next(self, _):
+    return self._next_generations
+
+  def Improve(self):
+    if self._next_generations:
+      return True
+    return False
+
+
+class SteeringTest(unittest.TestCase):
+  """This class test the steering method.
+
+  The steering algorithm should return if there is no new task in the initial
+  generation. The steering algorithm should send all the tasks to the next stage
+  and should terminate once there is no pending generation. A generation is
+  pending if it contains pending task. A task is pending if its (test) result
+  is not ready.
+  """
+
+  def testSteering(self):
+    """Test that the steering algorithm processes all the tasks properly.
+
+    Test that the steering algorithm sends all the tasks to the next stage. Test
+    that the steering algorithm terminates once all the tasks have been
+    processed, i.e., the results for the tasks are all ready.
+    """
+
+    # A list of generations used to test the steering stage.
+    generations = []
+
+    task_index = 0
+    previous_generations = None
+
+    # Generate a sequence of generations to be tested. Each generation will
+    # output the next generation in reverse order of the list when the "Next"
+    # method is called.
+    for _ in range(NUMBER_OF_GENERATIONS):
+      # Use a consecutive sequence of numbers as identifiers for the set of
+      # tasks put into a generation.
+      test_ranges = range(task_index, task_index + NUMBER_OF_TASKS)
+      tasks = [IdentifierMockTask(STEERING_TEST_STAGE, t) for t in test_ranges]
+      steering_tasks = set(tasks)
+
+      # Let the previous generation as the offspring generation of the current
+      # generation.
+      current_generation = MockGeneration(steering_tasks, previous_generations)
+      generations.insert(0, current_generation)
+      previous_generations = [current_generation]
+
+      task_index += NUMBER_OF_TASKS
+
+    # If there is no generation at all, the unittest returns right away.
+    if not current_generation:
+      return
+
+    # Set up the input and result queue for the steering method.
+    manager = multiprocessing.Manager()
+    input_queue = manager.Queue()
+    result_queue = manager.Queue()
+
+    steering_process = multiprocessing.Process(target=steering.Steering,
+                                               args=(set(),
+                                                     [current_generation],
+                                                     input_queue, result_queue))
+    steering_process.start()
+
+    # Test that each generation is processed properly. I.e., the generations are
+    # processed in order.
+    while generations:
+      generation = generations.pop(0)
+      tasks = [task for task in generation.Pool()]
+
+      # Test that all the tasks are processed once and only once.
+      while tasks:
+        task = result_queue.get()
+
+        assert task in tasks
+        tasks.remove(task)
+
+        input_queue.put(task)
+
+    task = result_queue.get()
+
+    # Test that the steering algorithm returns properly after processing all
+    # the generations.
+    assert task == pipeline_process.POISONPILL
+
+    steering_process.join()
+
+  def testCache(self):
+    """The steering algorithm returns immediately if there is no new tasks.
+
+    If all the new tasks have been cached before, the steering algorithm does
+    not have to execute these tasks again and thus can terminate right away.
+    """
+
+    # Put a set of tasks in the cache and add this set to initial generation.
+    test_ranges = range(NUMBER_OF_TASKS)
+    tasks = [IdentifierMockTask(STEERING_TEST_STAGE, t) for t in test_ranges]
+    steering_tasks = set(tasks)
+
+    current_generation = MockGeneration(steering_tasks, None)
+
+    # Set up the input and result queue for the steering method.
+    manager = multiprocessing.Manager()
+    input_queue = manager.Queue()
+    result_queue = manager.Queue()
+
+    steering_process = multiprocessing.Process(target=steering.Steering,
+                                               args=(steering_tasks,
+                                                     [current_generation],
+                                                     input_queue, result_queue))
+
+    steering_process.start()
+
+    # Test that the steering method returns right away.
+    assert result_queue.get() == pipeline_process.POISONPILL
+    steering_process.join()
 
 if __name__ == '__main__':
   unittest.main()