jamesren | 3e9f609 | 2010-03-11 21:32:10 +0000 | [diff] [blame] | 1 | import time, logging |
| 2 | from autotest_lib.frontend.afe import model_attributes as afe_model_attributes |
| 3 | from autotest_lib.frontend.shared import rest_client |
| 4 | from autotest_lib.frontend.planner import model_attributes |
| 5 | from autotest_lib.server import frontend |
| 6 | |
| 7 | |
| 8 | TICK_INTERVAL_SECS = 10 |
| 9 | |
jamesren | c394022 | 2010-02-19 21:57:37 +0000 | [diff] [blame] | 10 | class ExecutionEngine(object): |
| 11 | """ |
| 12 | Provides the Test Planner execution engine |
| 13 | """ |
| 14 | |
jamesren | 3e9f609 | 2010-03-11 21:32:10 +0000 | [diff] [blame] | 15 | _planner_rpc = frontend.Planner() |
| 16 | _tko_rpc = frontend.TKO() |
| 17 | |
| 18 | def __init__(self, plan_id, server, label_name): |
| 19 | self._plan_id = plan_id |
| 20 | self._server = server |
| 21 | self._afe_rest = rest_client.Resource.load( |
| 22 | 'http://%s/afe/server/resources' % server) |
| 23 | self._label_name = label_name |
jamesren | c394022 | 2010-02-19 21:57:37 +0000 | [diff] [blame] | 24 | |
| 25 | |
| 26 | def start(self): |
| 27 | """ |
| 28 | Starts the execution engine. |
| 29 | |
| 30 | Thread remains in this method until the execution engine is complete. |
| 31 | """ |
jamesren | 3e9f609 | 2010-03-11 21:32:10 +0000 | [diff] [blame] | 32 | self._initialize_plan() |
| 33 | |
| 34 | while True: |
| 35 | if self._tick(): |
| 36 | break |
| 37 | time.sleep(TICK_INTERVAL_SECS) |
| 38 | |
| 39 | self._cleanup() |
| 40 | |
| 41 | |
| 42 | def _initialize_plan(self): |
| 43 | """ |
| 44 | Performs actions necessary to start a test plan. |
| 45 | |
| 46 | Adds the hosts into the proper atomic group, and waits for the plan to |
| 47 | be ready to start before returning |
| 48 | """ |
| 49 | plan = self._planner_rpc.run('get_plan', id=self._plan_id) |
| 50 | name = plan['name'] + '_set_atomic_group' |
| 51 | if not self._afe_rest.jobs.get(name=name).total_results: |
| 52 | self._launch_set_atomic_group_job(name) |
| 53 | |
| 54 | self._wait_for_initialization() |
| 55 | |
| 56 | |
| 57 | def _launch_set_atomic_group_job(self, name): |
| 58 | """ |
| 59 | Launch the job to set the hosts' atomic group, and initate the plan |
| 60 | |
| 61 | If the hosts are already part of an atomic group, wait for a tick and |
| 62 | try again. Return when successful |
| 63 | """ |
| 64 | while True: |
| 65 | hosts = self._planner_rpc.run('get_hosts', plan_id=self._plan_id) |
| 66 | control = self._planner_rpc.run('get_atomic_group_control_file') |
| 67 | |
| 68 | info = self._afe_rest.execution_info.get().execution_info |
| 69 | info['control_file'] = control |
| 70 | info['cleanup_before_job'] = afe_model_attributes.RebootBefore.NEVER |
| 71 | info['cleanup_after_job'] = afe_model_attributes.RebootAfter.NEVER |
| 72 | info['run_verify'] = False |
| 73 | info['machines_per_execution'] = len(hosts) |
| 74 | |
| 75 | entries = self._afe_rest.queue_entries_request.get( |
| 76 | hosts=hosts).queue_entries |
| 77 | |
| 78 | keyvals = {'server': self._server, |
| 79 | 'label_name': self._label_name, |
| 80 | 'plan_id': self._plan_id} |
| 81 | |
| 82 | job_req = {'name' : name, |
| 83 | 'execution_info' : info, |
| 84 | 'queue_entries' : entries, |
| 85 | 'keyvals' : keyvals} |
| 86 | |
| 87 | try: |
| 88 | self._afe_rest.jobs.post(job_req) |
| 89 | logging.info('created job to set atomic group') |
| 90 | break |
| 91 | except rest_client.ClientError, e: |
| 92 | logging.info('hosts already in atomic group') |
| 93 | logging.info('(error was %s)' % e.message) |
| 94 | logging.info('waiting...') |
| 95 | time.sleep(TICK_INTERVAL_SECS) |
| 96 | |
| 97 | |
| 98 | def _wait_for_initialization(self): |
| 99 | while True: |
| 100 | plan = self._planner_rpc.run('get_plan', id=self._plan_id) |
| 101 | if plan['initialized']: |
| 102 | break |
| 103 | logging.info('waiting for initialization...') |
| 104 | time.sleep(TICK_INTERVAL_SECS) |
| 105 | |
| 106 | |
| 107 | def _cleanup(self): |
| 108 | self._afe_rest.labels.get(name=self._label_name).members[0].delete() |
| 109 | |
| 110 | |
| 111 | def _tick(self): |
| 112 | """ |
| 113 | Processes one tick of the execution engine. |
| 114 | |
| 115 | Returns True if the engine has completed the plan. |
| 116 | """ |
| 117 | logging.info('tick') |
| 118 | self._process_finished_runs() |
| 119 | self._check_tko_jobs() |
| 120 | return self._schedule_new_runs() |
| 121 | |
| 122 | |
| 123 | def _process_finished_runs(self): |
| 124 | """ |
| 125 | Finalize the test runs that have finished. |
| 126 | |
| 127 | Look for runs that are in PASSED or FAILED, perform any additional |
| 128 | processing required, and set the entry to 'finalized'. |
| 129 | """ |
| 130 | Status = model_attributes.TestRunStatus |
| 131 | runs = self._planner_rpc.run('get_test_runs', plan__id=self._plan_id, |
| 132 | status__in=(Status.PASSED, Status.FAILED), |
| 133 | finalized=False) |
| 134 | for run in runs: |
| 135 | logging.info('finalizing test run %s', run) |
| 136 | if run['status'] == Status.FAILED: |
| 137 | self._planner_rpc.run('modify_host', id=run['host'], |
| 138 | blocked=True) |
| 139 | self._planner_rpc.run('modify_test_run', id=run['id'], |
| 140 | finalized=True) |
| 141 | |
| 142 | |
| 143 | def _check_tko_jobs(self): |
| 144 | """ |
| 145 | Instructs the server to update the Planner test runs table |
| 146 | |
| 147 | Sends an RPC to have the server pull the proper TKO tests and add them |
| 148 | to the Planner tables. Logs information about what was added. |
| 149 | """ |
| 150 | test_runs_updated = self._planner_rpc.run('update_test_runs', |
| 151 | plan_id=self._plan_id) |
| 152 | for update in test_runs_updated: |
| 153 | logging.info('added %s test run for tko test id %s (%s)', |
| 154 | update['status'], update['tko_test_idx'], |
| 155 | update['hostname']) |
| 156 | |
| 157 | |
| 158 | def _schedule_new_runs(self): |
| 159 | next_configs = self._planner_rpc.run('get_next_test_configs', |
| 160 | plan_id=self._plan_id) |
| 161 | if next_configs['complete']: |
| 162 | return True |
| 163 | |
| 164 | for config in next_configs['next_configs']: |
| 165 | self._run_job(hostname=config['host'], |
| 166 | test_config_id=config['next_test_config_id']) |
| 167 | |
| 168 | return False |
| 169 | |
| 170 | |
| 171 | def _run_job(self, hostname, test_config_id): |
| 172 | test_config = self._planner_rpc.run('get_test_config', |
| 173 | id=test_config_id) |
| 174 | |
| 175 | info = self._afe_rest.execution_info.get().execution_info |
| 176 | info['control_file'] = test_config['control_file']['contents'] |
| 177 | info['is_server'] = test_config['is_server'] |
| 178 | |
| 179 | atomic_group_class = self._afe_rest.labels.get( |
| 180 | name=self._label_name).members[0].get().atomic_group_class.href |
| 181 | |
| 182 | request = self._afe_rest.queue_entries_request.get( |
| 183 | hosts=(hostname,), atomic_group_class=atomic_group_class) |
| 184 | entries = request.queue_entries |
| 185 | |
| 186 | plan = self._planner_rpc.run('get_plan', id=self._plan_id) |
| 187 | prefix = plan['label_override'] |
| 188 | if prefix is None: |
| 189 | prefix = plan['name'] |
| 190 | job_req = {'name' : '%s_%s_%s' % (prefix, test_config['alias'], |
| 191 | hostname), |
| 192 | 'execution_info' : info, |
| 193 | 'queue_entries' : entries} |
| 194 | |
| 195 | logging.info('starting test alias %s for host %s', |
| 196 | test_config['alias'], hostname) |
| 197 | job = self._afe_rest.jobs.post(job_req) |
| 198 | self._planner_rpc.run('add_job', |
| 199 | plan_id=self._plan_id, |
| 200 | test_config_id=test_config_id, |
| 201 | afe_job_id=job.get().id) |