blob: 340b3011e592f2d2088556696eda7a45b320816f [file] [log] [blame]
Yuheng Long761748d2013-06-28 09:32:43 -07001"""Unittest for the pipeline_worker functions in the build/test stage.
2
3This module tests the helper method and the worker method.
4"""
5
6__author__ = 'yuhenglong@google.com (Yuheng Long)'
7
8import multiprocessing
9import random
10import sys
11import unittest
12
13import pipeline_process
14import pipeline_worker
15
16
17TESTSTAGE = 0
18
19
20def MockTaskCostGenerator():
21 """Calls a random number generator and returns a negative number."""
22 return random.randint(-sys.maxint - 1, -1)
23
24
25class MockTask(object):
26 """This class emulates an actual task.
27
28 It does not do the actual work, but simply returns the result as given when
29 this task is constructed.
30 """
31
32 def __init__(self, identifier, cost):
33 """Set up the results for this task.
34
35 Args:
36 identifier: the identifier of this task.
37 cost: the mock cost of this task.
38
39 The _pre_cost field stores the cost. Once this task is performed, i.e., by
40 calling the work method , the _cost field will have this cost.
41 """
42
43 self._identifier = identifier
44 self._pre_cost = cost
45
46 def get_identifier(self, stage):
47 assert stage == TESTSTAGE
48 return self._identifier
49
50 def __eq__(self, other):
51 if isinstance(other, MockTask):
52 return self._identifier == other._identifier and self._cost == other._cost
53 return False
54
55 def set_result(self, stage, cost):
56 assert stage == TESTSTAGE
57 self._cost = cost
58
59 def work(self, stage):
60 assert stage == TESTSTAGE
61 self._cost = self._pre_cost
62
63 def get_result(self, stage):
64 assert stage == TESTSTAGE
65 return self._cost
66
67 def done(self, stage):
68 """Indicates whether the task has been performed."""
69
70 assert stage == TESTSTAGE
71 return '_cost' in self.__dict__
72
73
74class AuxiliaryTest(unittest.TestCase):
75 """This class tests the pipeline_worker functions.
76
77 Given the same identifier, the cost should result the same from the
78 pipeline_worker functions.
79 """
80
81 def testHelper(self):
82 """"Test the helper.
83
84 Call the helper method twice, and test the results. The results should be
85 the same, i.e., the cost should be the same.
86 """
87
88 # Set up the input, helper and output queue for the helper method.
89 manager = multiprocessing.Manager()
90 helper_queue = manager.Queue()
91 result_queue = manager.Queue()
92 completed_queue = manager.Queue()
93
94 # Set up the helper process that holds the helper method.
95 helper_process = multiprocessing.Process(target=pipeline_worker.helper,
96 args=(TESTSTAGE, {}, helper_queue,
97 completed_queue,
98 result_queue))
99 helper_process.start()
100
101 # A dictionary defines the mock result to the helper.
102 mock_result = {1: 1995, 2: 59, 9: 1027}
103
104 # Test if there is a task that is done before, whether the duplicate task
105 # will have the same result. Here, two different scenarios are tested. That
106 # is the mock results are added to the completed_queue before and after the
107 # corresponding mock tasks being added to the input queue.
108 completed_queue.put((9, mock_result[9]))
109
110 # The output of the helper should contain all the following tasks.
111 results = [1, 1, 2, 9]
112
113 # Testing the correctness of having tasks having the same identifier, here
114 # 1.
115 for result in results:
116 helper_queue.put(MockTask(result, MockTaskCostGenerator()))
117
118 completed_queue.put((2, mock_result[2]))
119 completed_queue.put((1, mock_result[1]))
120
121 # Signal there is no more duplicate task.
122 helper_queue.put(pipeline_process.POISONPILL)
123 helper_process.join()
124
125 while results:
126 task = result_queue.get()
127 identifier = task._identifier
128 cost = task._cost
129 self.assertTrue(identifier in results)
130 if identifier in mock_result:
131 self.assertTrue(cost, mock_result[identifier])
132 results.remove(task._identifier)
133
134 def testWorker(self):
135 """"Test the worker method.
136
137 The worker should process all the input tasks and output the tasks to the
138 helper and result queue.
139 """
140
141 manager = multiprocessing.Manager()
142 result_queue = manager.Queue()
143 completed_queue = manager.Queue()
144
145 # A dictionary defines the mock tasks and their corresponding results.
146 mock_work_tasks = {1: 86, 2: 788}
147
148 mock_tasks = []
149
150 for flag, cost in mock_work_tasks.iteritems():
151 mock_tasks.append(MockTask(flag, cost))
152
153 # Submit the mock tasks to the worker.
154 for mock_task in mock_tasks:
155 pipeline_worker.worker(TESTSTAGE, mock_task, completed_queue,
156 result_queue)
157
158 # The tasks, from the output queue, should be the same as the input and
159 # should be performed.
160 for task in mock_tasks:
161 output = result_queue.get()
162 self.assertEqual(output, task)
163 self.assertTrue(output.done(TESTSTAGE))
164
165 # The tasks, from the completed_queue, should be defined in the
166 # mock_work_tasks dictionary.
167 for flag, cost in mock_work_tasks.iteritems():
168 helper_input = completed_queue.get()
169 self.assertEqual(helper_input, (flag, cost))
170
171
172if __name__ == '__main__':
173 unittest.main()