jamesren | 3e9f609 | 2010-03-11 21:32:10 +0000 | [diff] [blame] | 1 | import time, logging |
| 2 | from autotest_lib.frontend.afe import model_attributes as afe_model_attributes |
| 3 | from autotest_lib.frontend.shared import rest_client |
jamesren | dbeebf8 | 2010-04-08 22:58:26 +0000 | [diff] [blame] | 4 | from autotest_lib.frontend.planner import model_attributes, support |
jamesren | 3e9f609 | 2010-03-11 21:32:10 +0000 | [diff] [blame] | 5 | from autotest_lib.server import frontend |
| 6 | |
| 7 | |
| 8 | TICK_INTERVAL_SECS = 10 |
| 9 | |
jamesren | c394022 | 2010-02-19 21:57:37 +0000 | [diff] [blame] | 10 | class ExecutionEngine(object): |
| 11 | """ |
| 12 | Provides the Test Planner execution engine |
| 13 | """ |
| 14 | |
jamesren | 3e9f609 | 2010-03-11 21:32:10 +0000 | [diff] [blame] | 15 | _planner_rpc = frontend.Planner() |
| 16 | _tko_rpc = frontend.TKO() |
| 17 | |
| 18 | def __init__(self, plan_id, server, label_name): |
| 19 | self._plan_id = plan_id |
| 20 | self._server = server |
| 21 | self._afe_rest = rest_client.Resource.load( |
| 22 | 'http://%s/afe/server/resources' % server) |
| 23 | self._label_name = label_name |
jamesren | c394022 | 2010-02-19 21:57:37 +0000 | [diff] [blame] | 24 | |
| 25 | |
| 26 | def start(self): |
| 27 | """ |
| 28 | Starts the execution engine. |
| 29 | |
| 30 | Thread remains in this method until the execution engine is complete. |
| 31 | """ |
jamesren | 3e9f609 | 2010-03-11 21:32:10 +0000 | [diff] [blame] | 32 | self._initialize_plan() |
| 33 | |
| 34 | while True: |
| 35 | if self._tick(): |
| 36 | break |
| 37 | time.sleep(TICK_INTERVAL_SECS) |
| 38 | |
| 39 | self._cleanup() |
| 40 | |
| 41 | |
| 42 | def _initialize_plan(self): |
| 43 | """ |
| 44 | Performs actions necessary to start a test plan. |
| 45 | |
| 46 | Adds the hosts into the proper atomic group, and waits for the plan to |
| 47 | be ready to start before returning |
| 48 | """ |
| 49 | plan = self._planner_rpc.run('get_plan', id=self._plan_id) |
| 50 | name = plan['name'] + '_set_atomic_group' |
| 51 | if not self._afe_rest.jobs.get(name=name).total_results: |
| 52 | self._launch_set_atomic_group_job(name) |
| 53 | |
| 54 | self._wait_for_initialization() |
| 55 | |
| 56 | |
| 57 | def _launch_set_atomic_group_job(self, name): |
| 58 | """ |
| 59 | Launch the job to set the hosts' atomic group, and initate the plan |
| 60 | |
| 61 | If the hosts are already part of an atomic group, wait for a tick and |
| 62 | try again. Return when successful |
| 63 | """ |
| 64 | while True: |
| 65 | hosts = self._planner_rpc.run('get_hosts', plan_id=self._plan_id) |
| 66 | control = self._planner_rpc.run('get_atomic_group_control_file') |
| 67 | |
| 68 | info = self._afe_rest.execution_info.get().execution_info |
| 69 | info['control_file'] = control |
| 70 | info['cleanup_before_job'] = afe_model_attributes.RebootBefore.NEVER |
| 71 | info['cleanup_after_job'] = afe_model_attributes.RebootAfter.NEVER |
| 72 | info['run_verify'] = False |
| 73 | info['machines_per_execution'] = len(hosts) |
| 74 | |
| 75 | entries = self._afe_rest.queue_entries_request.get( |
| 76 | hosts=hosts).queue_entries |
| 77 | |
| 78 | keyvals = {'server': self._server, |
| 79 | 'label_name': self._label_name, |
| 80 | 'plan_id': self._plan_id} |
| 81 | |
| 82 | job_req = {'name' : name, |
| 83 | 'execution_info' : info, |
| 84 | 'queue_entries' : entries, |
| 85 | 'keyvals' : keyvals} |
| 86 | |
| 87 | try: |
| 88 | self._afe_rest.jobs.post(job_req) |
| 89 | logging.info('created job to set atomic group') |
| 90 | break |
| 91 | except rest_client.ClientError, e: |
| 92 | logging.info('hosts already in atomic group') |
| 93 | logging.info('(error was %s)' % e.message) |
| 94 | logging.info('waiting...') |
| 95 | time.sleep(TICK_INTERVAL_SECS) |
| 96 | |
| 97 | |
| 98 | def _wait_for_initialization(self): |
| 99 | while True: |
| 100 | plan = self._planner_rpc.run('get_plan', id=self._plan_id) |
| 101 | if plan['initialized']: |
| 102 | break |
| 103 | logging.info('waiting for initialization...') |
| 104 | time.sleep(TICK_INTERVAL_SECS) |
| 105 | |
| 106 | |
| 107 | def _cleanup(self): |
| 108 | self._afe_rest.labels.get(name=self._label_name).members[0].delete() |
| 109 | |
| 110 | |
| 111 | def _tick(self): |
| 112 | """ |
| 113 | Processes one tick of the execution engine. |
| 114 | |
| 115 | Returns True if the engine has completed the plan. |
| 116 | """ |
| 117 | logging.info('tick') |
| 118 | self._process_finished_runs() |
| 119 | self._check_tko_jobs() |
| 120 | return self._schedule_new_runs() |
| 121 | |
| 122 | |
| 123 | def _process_finished_runs(self): |
| 124 | """ |
| 125 | Finalize the test runs that have finished. |
| 126 | |
| 127 | Look for runs that are in PASSED or FAILED, perform any additional |
| 128 | processing required, and set the entry to 'finalized'. |
| 129 | """ |
| 130 | Status = model_attributes.TestRunStatus |
| 131 | runs = self._planner_rpc.run('get_test_runs', plan__id=self._plan_id, |
| 132 | status__in=(Status.PASSED, Status.FAILED), |
| 133 | finalized=False) |
| 134 | for run in runs: |
| 135 | logging.info('finalizing test run %s', run) |
jamesren | dbeebf8 | 2010-04-08 22:58:26 +0000 | [diff] [blame] | 136 | |
| 137 | controller = support.TestPlanController( |
| 138 | machine=run['host']['host'], |
| 139 | test_alias=run['test_job']['test_config']['alias']) |
| 140 | self._run_execute_after(controller, tko_test_id=run['tko_test'], |
| 141 | success=(run['status'] == Status.PASSED)) |
| 142 | |
| 143 | if controller._fail: |
| 144 | raise NotImplemented('TODO: implement forced failure') |
| 145 | |
| 146 | failed = (run['status'] == Status.FAILED or controller._fail) |
| 147 | if failed and not controller._unblock: |
| 148 | self._planner_rpc.run('modify_host', id=run['host']['id'], |
jamesren | 3e9f609 | 2010-03-11 21:32:10 +0000 | [diff] [blame] | 149 | blocked=True) |
| 150 | self._planner_rpc.run('modify_test_run', id=run['id'], |
| 151 | finalized=True) |
| 152 | |
| 153 | |
| 154 | def _check_tko_jobs(self): |
| 155 | """ |
| 156 | Instructs the server to update the Planner test runs table |
| 157 | |
| 158 | Sends an RPC to have the server pull the proper TKO tests and add them |
| 159 | to the Planner tables. Logs information about what was added. |
| 160 | """ |
| 161 | test_runs_updated = self._planner_rpc.run('update_test_runs', |
| 162 | plan_id=self._plan_id) |
| 163 | for update in test_runs_updated: |
| 164 | logging.info('added %s test run for tko test id %s (%s)', |
| 165 | update['status'], update['tko_test_idx'], |
| 166 | update['hostname']) |
| 167 | |
| 168 | |
| 169 | def _schedule_new_runs(self): |
| 170 | next_configs = self._planner_rpc.run('get_next_test_configs', |
| 171 | plan_id=self._plan_id) |
| 172 | if next_configs['complete']: |
| 173 | return True |
| 174 | |
| 175 | for config in next_configs['next_configs']: |
jamesren | dbeebf8 | 2010-04-08 22:58:26 +0000 | [diff] [blame] | 176 | config_id = config['next_test_config_id'] |
| 177 | controller = support.TestPlanController( |
| 178 | machine=config['host'], |
| 179 | test_alias=config['next_test_config_alias']) |
| 180 | self._run_execute_before(controller) |
| 181 | if controller._skip: |
| 182 | self._planner_rpc.run('skip_test', test_config_id=config_id, |
| 183 | hostname=config['host']) |
| 184 | continue |
| 185 | |
jamesren | 3e9f609 | 2010-03-11 21:32:10 +0000 | [diff] [blame] | 186 | self._run_job(hostname=config['host'], |
jamesren | dbeebf8 | 2010-04-08 22:58:26 +0000 | [diff] [blame] | 187 | test_config_id=config_id, |
| 188 | cleanup_before_job=controller._reboot_before, |
| 189 | cleanup_after_job=controller._reboot_after, |
| 190 | run_verify=controller._run_verify) |
jamesren | 3e9f609 | 2010-03-11 21:32:10 +0000 | [diff] [blame] | 191 | |
| 192 | return False |
| 193 | |
| 194 | |
jamesren | dbeebf8 | 2010-04-08 22:58:26 +0000 | [diff] [blame] | 195 | def _run_job(self, hostname, test_config_id, cleanup_before_job, |
| 196 | cleanup_after_job, run_verify): |
jamesren | 3e9f609 | 2010-03-11 21:32:10 +0000 | [diff] [blame] | 197 | test_config = self._planner_rpc.run('get_test_config', |
| 198 | id=test_config_id) |
| 199 | |
| 200 | info = self._afe_rest.execution_info.get().execution_info |
| 201 | info['control_file'] = test_config['control_file']['contents'] |
| 202 | info['is_server'] = test_config['is_server'] |
jamesren | dbeebf8 | 2010-04-08 22:58:26 +0000 | [diff] [blame] | 203 | info['cleanup_before_job'] = cleanup_before_job |
| 204 | info['cleanup_after_job'] = cleanup_after_job |
| 205 | info['run_verify'] = run_verify |
jamesren | 3e9f609 | 2010-03-11 21:32:10 +0000 | [diff] [blame] | 206 | |
| 207 | atomic_group_class = self._afe_rest.labels.get( |
| 208 | name=self._label_name).members[0].get().atomic_group_class.href |
| 209 | |
| 210 | request = self._afe_rest.queue_entries_request.get( |
| 211 | hosts=(hostname,), atomic_group_class=atomic_group_class) |
| 212 | entries = request.queue_entries |
| 213 | |
| 214 | plan = self._planner_rpc.run('get_plan', id=self._plan_id) |
| 215 | prefix = plan['label_override'] |
| 216 | if prefix is None: |
| 217 | prefix = plan['name'] |
| 218 | job_req = {'name' : '%s_%s_%s' % (prefix, test_config['alias'], |
| 219 | hostname), |
| 220 | 'execution_info' : info, |
| 221 | 'queue_entries' : entries} |
| 222 | |
| 223 | logging.info('starting test alias %s for host %s', |
| 224 | test_config['alias'], hostname) |
| 225 | job = self._afe_rest.jobs.post(job_req) |
| 226 | self._planner_rpc.run('add_job', |
| 227 | plan_id=self._plan_id, |
| 228 | test_config_id=test_config_id, |
| 229 | afe_job_id=job.get().id) |
jamesren | dbeebf8 | 2010-04-08 22:58:26 +0000 | [diff] [blame] | 230 | |
| 231 | |
| 232 | def _run_execute_before(self, controller): |
| 233 | """ |
| 234 | Execute the global support's execute_before() for the plan |
| 235 | """ |
| 236 | self._run_global_support(controller, 'execute_before') |
| 237 | |
| 238 | |
| 239 | def _run_execute_after(self, controller, tko_test_id, success): |
| 240 | """ |
| 241 | Execute the global support's execute_after() for the plan |
| 242 | """ |
| 243 | self._run_global_support(controller, 'execute_after', |
| 244 | tko_test_id=tko_test_id, success=success) |
| 245 | |
| 246 | |
| 247 | def _run_global_support(self, controller, function_name, **kwargs): |
| 248 | plan = self._planner_rpc.run('get_plan', id=self._plan_id) |
| 249 | if plan['support']: |
| 250 | context = {'model_attributes': afe_model_attributes} |
| 251 | exec plan['support'] in context |
| 252 | function = context.get(function_name) |
| 253 | if function: |
| 254 | if not callable(function): |
| 255 | raise Exception('Global support defines %s, but it is not ' |
| 256 | 'callable' % function_name) |
| 257 | function(controller, **kwargs) |