blob: 4a1722a8c7b70c8bee55e7c4748728d5d67a0be4 [file] [log] [blame]
Yuheng Long761748d2013-06-28 09:32:43 -07001"""The pipeline_worker functions of the build and test stage of the framework.
2
Yuheng Long49358b72013-07-10 14:45:29 -07003Part of the Chrome build flags optimization.
4
Yuheng Long761748d2013-06-28 09:32:43 -07005This module defines the helper and the worker. If there are duplicate tasks, for
6example t1 and t2, needs to be built/tested, one of them, for example t1, will
7be built/tested and the helper waits for the result of t1 and set the results of
8the other task, t2 here, to be the same as that of t1. Setting the result of t2
9to be the same as t1 is referred to as resolving the result of t2.
10The worker invokes the work method of the tasks that are not duplicate.
11"""
12
13__author__ = 'yuhenglong@google.com (Yuheng Long)'
14
15import pipeline_process
16
17
18def helper(stage, done_dict, helper_queue, completed_queue, result_queue):
19 """Helper that filters duplicate tasks.
20
21 This method Continuously pulls duplicate tasks from the helper_queue. The
22 duplicate tasks need not be compiled/tested. This method also pulls completed
23 tasks from the worker queue and let the results of the duplicate tasks be the
24 same as their corresponding finished task.
25
26 Args:
27 stage: The current stage of the pipeline, for example, build stage or test
28 stage.
29 done_dict: A dictionary of tasks that are done. The key of the dictionary is
30 the identifier of the task. The value of the dictionary is the results of
31 performing the corresponding task.
32 helper_queue: A queue of duplicate tasks whose results need to be resolved.
33 This is a communication channel between the pipeline_process and this
34 helper process.
35 completed_queue: A queue of tasks that have been built/tested. The results
36 of these tasks are needed to resolve the results of the duplicate tasks.
37 This is the communication channel between the workers and this helper
38 process.
39 result_queue: After the results of the duplicate tasks have been resolved,
40 the duplicate tasks will be sent to the next stage via this queue.
41 """
42
43 # The list of duplicate tasks, the results of which need to be resolved.
44 waiting_list = []
45
46 while True:
47 # Pull duplicate task from the helper queue.
48 if not helper_queue.empty():
49 task = helper_queue.get()
50
51 if task == pipeline_process.POISONPILL:
52 # Poison pill means no more duplicate task from the helper queue.
53 break
54
55 # The task has not been performed before.
56 assert not task.done(stage)
57
58 # The identifier of this task.
59 identifier = task.get_identifier(stage)
60
61 # If a duplicate task comes before the corresponding resolved results from
62 # the completed_queue, it will be put in the waiting list. If the result
63 # arrives before the duplicate task, the duplicate task will be resolved
64 # right away.
65 if identifier in done_dict:
66 # This task has been encountered before and the result is available. The
67 # result can be resolved right away.
68 task.set_result(stage, done_dict[identifier])
69 result_queue.put(task)
70 else:
71 waiting_list.append(task)
72
73 # Check and get completed tasks from completed_queue.
74 get_result_from_completed_queue(stage, completed_queue, done_dict,
75 waiting_list, result_queue)
76
77 # Wait to resolve the results of the remaining duplicate tasks.
78 while waiting_list:
79 get_result_from_completed_queue(stage, completed_queue, done_dict,
80 waiting_list, result_queue)
81
82
83def get_result_from_completed_queue(stage, completed_queue, done_dict,
84 waiting_list, result_queue):
85 """Pull results from the completed queue and resolves duplicate tasks.
86
87 Args:
88 stage: The current stage of the pipeline, for example, build stage or test
89 stage.
90 completed_queue: A queue of tasks that have been performed. The results of
91 these tasks are needed to resolve the results of the duplicate tasks. This
92 is the communication channel between the workers and this method.
93 done_dict: A dictionary of tasks that are done. The key of the dictionary is
94 the optimization flags of the task. The value of the dictionary is the
95 compilation results of the corresponding task.
96 waiting_list: The list of duplicate tasks, the results of which need to be
97 resolved.
98 result_queue: After the results of the duplicate tasks have been resolved,
99 the duplicate tasks will be sent to the next stage via this queue.
100
101 This helper method tries to pull a completed task from the completed queue.
102 If it gets a task from the queue, it resolves the results of all the relevant
103 duplicate tasks in the waiting list. Relevant tasks are the tasks that have
104 the same flags as the currently received results from the completed_queue.
105 """
106 # Pull completed task from the worker queue.
107 if not completed_queue.empty():
108 (identifier, result) = completed_queue.get()
109 done_dict[identifier] = result
110
111 tasks = [t for t in waiting_list if t.get_identifier(stage) == identifier]
112 for duplicate_task in tasks:
113 duplicate_task.set_result(stage, result)
114 result_queue.put(duplicate_task)
115 waiting_list.remove(duplicate_task)
116
117
118def worker(stage, task, helper_queue, result_queue):
119 """Worker that performs the task.
120
121 This method calls the work method of the input task and distribute the result
122 to the helper and the next stage.
123
124 Args:
125 stage: The current stage of the pipeline, for example, build stage or test
126 stage.
127 task: Input task that needs to be performed.
128 helper_queue: Queue that holds the completed tasks and the results. This is
129 the communication channel between the worker and the helper.
130 result_queue: Queue that holds the completed tasks and the results. This is
131 the communication channel between the worker and the next stage.
132 """
133
134 # The task has not been completed before.
135 assert not task.done(stage)
136
137 task.work(stage)
138 helper_queue.put((task.get_identifier(stage), task.get_result(stage)))
139 result_queue.put(task)