blob: b59568570bfb373f43da08a62d7cf2d2554a86b7 [file] [log] [blame]
"""Pipeline Process unittest.
Part of the Chrome build flags optimization.
"""
__author__ = 'yuhenglong@google.com (Yuheng Long)'
import multiprocessing
import unittest
from mock_task import MockTask
import pipeline_process
# Pick an integer at random.
ERROR = -334
# Pick an integer at random.
TESTSTAGE = -8
def MockHelper(done_dict, helper_queue, _, result_queue):
"""This method echos input to the output."""
while True:
if not helper_queue.empty():
task = helper_queue.get()
if task == pipeline_process.POISONPILL:
# Poison pill means shutdown
break
if task in done_dict:
# verify that it does not get duplicate "1"s in the test.
result_queue.put(ERROR)
else:
result_queue.put(('helper', task.GetIdentifier(TESTSTAGE)))
def MockWorker(task, _, result_queue):
result_queue.put(('worker', task.GetIdentifier(TESTSTAGE)))
class PipelineProcessTest(unittest.TestCase):
"""This class test the PipelineProcess.
All the task inserted into the input queue should be taken out and hand to the
actual pipeline handler, except for the POISON_PILL. All these task should
also be passed to the next pipeline stage via the output queue.
"""
def setUp(self):
pass
def testRun(self):
"""Test the run method.
Ensure that all the tasks inserted into the queue are properly handled.
"""
manager = multiprocessing.Manager()
inp = manager.Queue()
output = manager.Queue()
process = pipeline_process.PipelineProcess(2, 'testing', {}, TESTSTAGE, inp,
MockHelper, MockWorker, output)
process.start()
inp.put(MockTask(TESTSTAGE, 1))
inp.put(MockTask(TESTSTAGE, 1))
inp.put(MockTask(TESTSTAGE, 2))
inp.put(pipeline_process.POISONPILL)
process.join()
# All tasks are processed once and only once.
result = [('worker', 1), ('helper', 1), ('worker', 2),
pipeline_process.POISONPILL]
while result:
task = output.get()
# One "1"s is passed to the worker and one to the helper.
self.assertNotEqual(task, ERROR)
# The messages received should be exactly the same as the result.
self.assertTrue(task in result)
result.remove(task)
if __name__ == '__main__':
unittest.main()