Yuheng Long | 761748d | 2013-06-28 09:32:43 -0700 | [diff] [blame^] | 1 | """Unittest for the pipeline_worker functions in the build/test stage. |
| 2 | |
| 3 | This module tests the helper method and the worker method. |
| 4 | """ |
| 5 | |
| 6 | __author__ = 'yuhenglong@google.com (Yuheng Long)' |
| 7 | |
| 8 | import multiprocessing |
| 9 | import random |
| 10 | import sys |
| 11 | import unittest |
| 12 | |
| 13 | import pipeline_process |
| 14 | import pipeline_worker |
| 15 | |
| 16 | |
| 17 | TESTSTAGE = 0 |
| 18 | |
| 19 | |
| 20 | def MockTaskCostGenerator(): |
| 21 | """Calls a random number generator and returns a negative number.""" |
| 22 | return random.randint(-sys.maxint - 1, -1) |
| 23 | |
| 24 | |
| 25 | class MockTask(object): |
| 26 | """This class emulates an actual task. |
| 27 | |
| 28 | It does not do the actual work, but simply returns the result as given when |
| 29 | this task is constructed. |
| 30 | """ |
| 31 | |
| 32 | def __init__(self, identifier, cost): |
| 33 | """Set up the results for this task. |
| 34 | |
| 35 | Args: |
| 36 | identifier: the identifier of this task. |
| 37 | cost: the mock cost of this task. |
| 38 | |
| 39 | The _pre_cost field stores the cost. Once this task is performed, i.e., by |
| 40 | calling the work method , the _cost field will have this cost. |
| 41 | """ |
| 42 | |
| 43 | self._identifier = identifier |
| 44 | self._pre_cost = cost |
| 45 | |
| 46 | def get_identifier(self, stage): |
| 47 | assert stage == TESTSTAGE |
| 48 | return self._identifier |
| 49 | |
| 50 | def __eq__(self, other): |
| 51 | if isinstance(other, MockTask): |
| 52 | return self._identifier == other._identifier and self._cost == other._cost |
| 53 | return False |
| 54 | |
| 55 | def set_result(self, stage, cost): |
| 56 | assert stage == TESTSTAGE |
| 57 | self._cost = cost |
| 58 | |
| 59 | def work(self, stage): |
| 60 | assert stage == TESTSTAGE |
| 61 | self._cost = self._pre_cost |
| 62 | |
| 63 | def get_result(self, stage): |
| 64 | assert stage == TESTSTAGE |
| 65 | return self._cost |
| 66 | |
| 67 | def done(self, stage): |
| 68 | """Indicates whether the task has been performed.""" |
| 69 | |
| 70 | assert stage == TESTSTAGE |
| 71 | return '_cost' in self.__dict__ |
| 72 | |
| 73 | |
| 74 | class AuxiliaryTest(unittest.TestCase): |
| 75 | """This class tests the pipeline_worker functions. |
| 76 | |
| 77 | Given the same identifier, the cost should result the same from the |
| 78 | pipeline_worker functions. |
| 79 | """ |
| 80 | |
| 81 | def testHelper(self): |
| 82 | """"Test the helper. |
| 83 | |
| 84 | Call the helper method twice, and test the results. The results should be |
| 85 | the same, i.e., the cost should be the same. |
| 86 | """ |
| 87 | |
| 88 | # Set up the input, helper and output queue for the helper method. |
| 89 | manager = multiprocessing.Manager() |
| 90 | helper_queue = manager.Queue() |
| 91 | result_queue = manager.Queue() |
| 92 | completed_queue = manager.Queue() |
| 93 | |
| 94 | # Set up the helper process that holds the helper method. |
| 95 | helper_process = multiprocessing.Process(target=pipeline_worker.helper, |
| 96 | args=(TESTSTAGE, {}, helper_queue, |
| 97 | completed_queue, |
| 98 | result_queue)) |
| 99 | helper_process.start() |
| 100 | |
| 101 | # A dictionary defines the mock result to the helper. |
| 102 | mock_result = {1: 1995, 2: 59, 9: 1027} |
| 103 | |
| 104 | # Test if there is a task that is done before, whether the duplicate task |
| 105 | # will have the same result. Here, two different scenarios are tested. That |
| 106 | # is the mock results are added to the completed_queue before and after the |
| 107 | # corresponding mock tasks being added to the input queue. |
| 108 | completed_queue.put((9, mock_result[9])) |
| 109 | |
| 110 | # The output of the helper should contain all the following tasks. |
| 111 | results = [1, 1, 2, 9] |
| 112 | |
| 113 | # Testing the correctness of having tasks having the same identifier, here |
| 114 | # 1. |
| 115 | for result in results: |
| 116 | helper_queue.put(MockTask(result, MockTaskCostGenerator())) |
| 117 | |
| 118 | completed_queue.put((2, mock_result[2])) |
| 119 | completed_queue.put((1, mock_result[1])) |
| 120 | |
| 121 | # Signal there is no more duplicate task. |
| 122 | helper_queue.put(pipeline_process.POISONPILL) |
| 123 | helper_process.join() |
| 124 | |
| 125 | while results: |
| 126 | task = result_queue.get() |
| 127 | identifier = task._identifier |
| 128 | cost = task._cost |
| 129 | self.assertTrue(identifier in results) |
| 130 | if identifier in mock_result: |
| 131 | self.assertTrue(cost, mock_result[identifier]) |
| 132 | results.remove(task._identifier) |
| 133 | |
| 134 | def testWorker(self): |
| 135 | """"Test the worker method. |
| 136 | |
| 137 | The worker should process all the input tasks and output the tasks to the |
| 138 | helper and result queue. |
| 139 | """ |
| 140 | |
| 141 | manager = multiprocessing.Manager() |
| 142 | result_queue = manager.Queue() |
| 143 | completed_queue = manager.Queue() |
| 144 | |
| 145 | # A dictionary defines the mock tasks and their corresponding results. |
| 146 | mock_work_tasks = {1: 86, 2: 788} |
| 147 | |
| 148 | mock_tasks = [] |
| 149 | |
| 150 | for flag, cost in mock_work_tasks.iteritems(): |
| 151 | mock_tasks.append(MockTask(flag, cost)) |
| 152 | |
| 153 | # Submit the mock tasks to the worker. |
| 154 | for mock_task in mock_tasks: |
| 155 | pipeline_worker.worker(TESTSTAGE, mock_task, completed_queue, |
| 156 | result_queue) |
| 157 | |
| 158 | # The tasks, from the output queue, should be the same as the input and |
| 159 | # should be performed. |
| 160 | for task in mock_tasks: |
| 161 | output = result_queue.get() |
| 162 | self.assertEqual(output, task) |
| 163 | self.assertTrue(output.done(TESTSTAGE)) |
| 164 | |
| 165 | # The tasks, from the completed_queue, should be defined in the |
| 166 | # mock_work_tasks dictionary. |
| 167 | for flag, cost in mock_work_tasks.iteritems(): |
| 168 | helper_input = completed_queue.get() |
| 169 | self.assertEqual(helper_input, (flag, cost)) |
| 170 | |
| 171 | |
| 172 | if __name__ == '__main__': |
| 173 | unittest.main() |