Yuheng Long | 761748d | 2013-06-28 09:32:43 -0700 | [diff] [blame] | 1 | """The pipeline_worker functions of the build and test stage of the framework. |
| 2 | |
Yuheng Long | 49358b7 | 2013-07-10 14:45:29 -0700 | [diff] [blame^] | 3 | Part of the Chrome build flags optimization. |
| 4 | |
Yuheng Long | 761748d | 2013-06-28 09:32:43 -0700 | [diff] [blame] | 5 | This module defines the helper and the worker. If there are duplicate tasks, for |
| 6 | example t1 and t2, needs to be built/tested, one of them, for example t1, will |
| 7 | be built/tested and the helper waits for the result of t1 and set the results of |
| 8 | the other task, t2 here, to be the same as that of t1. Setting the result of t2 |
| 9 | to be the same as t1 is referred to as resolving the result of t2. |
| 10 | The worker invokes the work method of the tasks that are not duplicate. |
| 11 | """ |
| 12 | |
| 13 | __author__ = 'yuhenglong@google.com (Yuheng Long)' |
| 14 | |
| 15 | import pipeline_process |
| 16 | |
| 17 | |
| 18 | def helper(stage, done_dict, helper_queue, completed_queue, result_queue): |
| 19 | """Helper that filters duplicate tasks. |
| 20 | |
| 21 | This method Continuously pulls duplicate tasks from the helper_queue. The |
| 22 | duplicate tasks need not be compiled/tested. This method also pulls completed |
| 23 | tasks from the worker queue and let the results of the duplicate tasks be the |
| 24 | same as their corresponding finished task. |
| 25 | |
| 26 | Args: |
| 27 | stage: The current stage of the pipeline, for example, build stage or test |
| 28 | stage. |
| 29 | done_dict: A dictionary of tasks that are done. The key of the dictionary is |
| 30 | the identifier of the task. The value of the dictionary is the results of |
| 31 | performing the corresponding task. |
| 32 | helper_queue: A queue of duplicate tasks whose results need to be resolved. |
| 33 | This is a communication channel between the pipeline_process and this |
| 34 | helper process. |
| 35 | completed_queue: A queue of tasks that have been built/tested. The results |
| 36 | of these tasks are needed to resolve the results of the duplicate tasks. |
| 37 | This is the communication channel between the workers and this helper |
| 38 | process. |
| 39 | result_queue: After the results of the duplicate tasks have been resolved, |
| 40 | the duplicate tasks will be sent to the next stage via this queue. |
| 41 | """ |
| 42 | |
| 43 | # The list of duplicate tasks, the results of which need to be resolved. |
| 44 | waiting_list = [] |
| 45 | |
| 46 | while True: |
| 47 | # Pull duplicate task from the helper queue. |
| 48 | if not helper_queue.empty(): |
| 49 | task = helper_queue.get() |
| 50 | |
| 51 | if task == pipeline_process.POISONPILL: |
| 52 | # Poison pill means no more duplicate task from the helper queue. |
| 53 | break |
| 54 | |
| 55 | # The task has not been performed before. |
| 56 | assert not task.done(stage) |
| 57 | |
| 58 | # The identifier of this task. |
| 59 | identifier = task.get_identifier(stage) |
| 60 | |
| 61 | # If a duplicate task comes before the corresponding resolved results from |
| 62 | # the completed_queue, it will be put in the waiting list. If the result |
| 63 | # arrives before the duplicate task, the duplicate task will be resolved |
| 64 | # right away. |
| 65 | if identifier in done_dict: |
| 66 | # This task has been encountered before and the result is available. The |
| 67 | # result can be resolved right away. |
| 68 | task.set_result(stage, done_dict[identifier]) |
| 69 | result_queue.put(task) |
| 70 | else: |
| 71 | waiting_list.append(task) |
| 72 | |
| 73 | # Check and get completed tasks from completed_queue. |
| 74 | get_result_from_completed_queue(stage, completed_queue, done_dict, |
| 75 | waiting_list, result_queue) |
| 76 | |
| 77 | # Wait to resolve the results of the remaining duplicate tasks. |
| 78 | while waiting_list: |
| 79 | get_result_from_completed_queue(stage, completed_queue, done_dict, |
| 80 | waiting_list, result_queue) |
| 81 | |
| 82 | |
| 83 | def get_result_from_completed_queue(stage, completed_queue, done_dict, |
| 84 | waiting_list, result_queue): |
| 85 | """Pull results from the completed queue and resolves duplicate tasks. |
| 86 | |
| 87 | Args: |
| 88 | stage: The current stage of the pipeline, for example, build stage or test |
| 89 | stage. |
| 90 | completed_queue: A queue of tasks that have been performed. The results of |
| 91 | these tasks are needed to resolve the results of the duplicate tasks. This |
| 92 | is the communication channel between the workers and this method. |
| 93 | done_dict: A dictionary of tasks that are done. The key of the dictionary is |
| 94 | the optimization flags of the task. The value of the dictionary is the |
| 95 | compilation results of the corresponding task. |
| 96 | waiting_list: The list of duplicate tasks, the results of which need to be |
| 97 | resolved. |
| 98 | result_queue: After the results of the duplicate tasks have been resolved, |
| 99 | the duplicate tasks will be sent to the next stage via this queue. |
| 100 | |
| 101 | This helper method tries to pull a completed task from the completed queue. |
| 102 | If it gets a task from the queue, it resolves the results of all the relevant |
| 103 | duplicate tasks in the waiting list. Relevant tasks are the tasks that have |
| 104 | the same flags as the currently received results from the completed_queue. |
| 105 | """ |
| 106 | # Pull completed task from the worker queue. |
| 107 | if not completed_queue.empty(): |
| 108 | (identifier, result) = completed_queue.get() |
| 109 | done_dict[identifier] = result |
| 110 | |
| 111 | tasks = [t for t in waiting_list if t.get_identifier(stage) == identifier] |
| 112 | for duplicate_task in tasks: |
| 113 | duplicate_task.set_result(stage, result) |
| 114 | result_queue.put(duplicate_task) |
| 115 | waiting_list.remove(duplicate_task) |
| 116 | |
| 117 | |
| 118 | def worker(stage, task, helper_queue, result_queue): |
| 119 | """Worker that performs the task. |
| 120 | |
| 121 | This method calls the work method of the input task and distribute the result |
| 122 | to the helper and the next stage. |
| 123 | |
| 124 | Args: |
| 125 | stage: The current stage of the pipeline, for example, build stage or test |
| 126 | stage. |
| 127 | task: Input task that needs to be performed. |
| 128 | helper_queue: Queue that holds the completed tasks and the results. This is |
| 129 | the communication channel between the worker and the helper. |
| 130 | result_queue: Queue that holds the completed tasks and the results. This is |
| 131 | the communication channel between the worker and the next stage. |
| 132 | """ |
| 133 | |
| 134 | # The task has not been completed before. |
| 135 | assert not task.done(stage) |
| 136 | |
| 137 | task.work(stage) |
| 138 | helper_queue.put((task.get_identifier(stage), task.get_result(stage))) |
| 139 | result_queue.put(task) |