blob: 9eea6ad7365b67c92719c928f63f2a7b9d453774 [file] [log] [blame]
jamesren3e9f6092010-03-11 21:32:10 +00001import time, logging
2from autotest_lib.frontend.afe import model_attributes as afe_model_attributes
3from autotest_lib.frontend.shared import rest_client
jamesrendbeebf82010-04-08 22:58:26 +00004from autotest_lib.frontend.planner import model_attributes, support
jamesren3e9f6092010-03-11 21:32:10 +00005from autotest_lib.server import frontend
6
7
8TICK_INTERVAL_SECS = 10
9
jamesrenc3940222010-02-19 21:57:37 +000010class ExecutionEngine(object):
11 """
12 Provides the Test Planner execution engine
13 """
14
jamesren3e9f6092010-03-11 21:32:10 +000015 _planner_rpc = frontend.Planner()
16 _tko_rpc = frontend.TKO()
17
18 def __init__(self, plan_id, server, label_name):
19 self._plan_id = plan_id
20 self._server = server
21 self._afe_rest = rest_client.Resource.load(
22 'http://%s/afe/server/resources' % server)
23 self._label_name = label_name
jamesrenc3940222010-02-19 21:57:37 +000024
25
26 def start(self):
27 """
28 Starts the execution engine.
29
30 Thread remains in this method until the execution engine is complete.
31 """
jamesren3e9f6092010-03-11 21:32:10 +000032 self._initialize_plan()
33
34 while True:
35 if self._tick():
36 break
37 time.sleep(TICK_INTERVAL_SECS)
38
39 self._cleanup()
40
41
42 def _initialize_plan(self):
43 """
44 Performs actions necessary to start a test plan.
45
46 Adds the hosts into the proper atomic group, and waits for the plan to
47 be ready to start before returning
48 """
49 plan = self._planner_rpc.run('get_plan', id=self._plan_id)
50 name = plan['name'] + '_set_atomic_group'
51 if not self._afe_rest.jobs.get(name=name).total_results:
52 self._launch_set_atomic_group_job(name)
53
54 self._wait_for_initialization()
55
56
57 def _launch_set_atomic_group_job(self, name):
58 """
59 Launch the job to set the hosts' atomic group, and initate the plan
60
61 If the hosts are already part of an atomic group, wait for a tick and
62 try again. Return when successful
63 """
64 while True:
65 hosts = self._planner_rpc.run('get_hosts', plan_id=self._plan_id)
66 control = self._planner_rpc.run('get_atomic_group_control_file')
67
68 info = self._afe_rest.execution_info.get().execution_info
69 info['control_file'] = control
70 info['cleanup_before_job'] = afe_model_attributes.RebootBefore.NEVER
71 info['cleanup_after_job'] = afe_model_attributes.RebootAfter.NEVER
72 info['run_verify'] = False
73 info['machines_per_execution'] = len(hosts)
74
75 entries = self._afe_rest.queue_entries_request.get(
76 hosts=hosts).queue_entries
77
78 keyvals = {'server': self._server,
79 'label_name': self._label_name,
80 'plan_id': self._plan_id}
81
82 job_req = {'name' : name,
83 'execution_info' : info,
84 'queue_entries' : entries,
85 'keyvals' : keyvals}
86
87 try:
88 self._afe_rest.jobs.post(job_req)
89 logging.info('created job to set atomic group')
90 break
91 except rest_client.ClientError, e:
92 logging.info('hosts already in atomic group')
93 logging.info('(error was %s)' % e.message)
94 logging.info('waiting...')
95 time.sleep(TICK_INTERVAL_SECS)
96
97
98 def _wait_for_initialization(self):
99 while True:
100 plan = self._planner_rpc.run('get_plan', id=self._plan_id)
101 if plan['initialized']:
102 break
103 logging.info('waiting for initialization...')
104 time.sleep(TICK_INTERVAL_SECS)
105
106
107 def _cleanup(self):
108 self._afe_rest.labels.get(name=self._label_name).members[0].delete()
109
110
111 def _tick(self):
112 """
113 Processes one tick of the execution engine.
114
115 Returns True if the engine has completed the plan.
116 """
117 logging.info('tick')
118 self._process_finished_runs()
119 self._check_tko_jobs()
120 return self._schedule_new_runs()
121
122
123 def _process_finished_runs(self):
124 """
125 Finalize the test runs that have finished.
126
127 Look for runs that are in PASSED or FAILED, perform any additional
128 processing required, and set the entry to 'finalized'.
129 """
130 Status = model_attributes.TestRunStatus
131 runs = self._planner_rpc.run('get_test_runs', plan__id=self._plan_id,
132 status__in=(Status.PASSED, Status.FAILED),
133 finalized=False)
134 for run in runs:
135 logging.info('finalizing test run %s', run)
jamesrendbeebf82010-04-08 22:58:26 +0000136
137 controller = support.TestPlanController(
138 machine=run['host']['host'],
139 test_alias=run['test_job']['test_config']['alias'])
140 self._run_execute_after(controller, tko_test_id=run['tko_test'],
141 success=(run['status'] == Status.PASSED))
142
143 if controller._fail:
144 raise NotImplemented('TODO: implement forced failure')
145
146 failed = (run['status'] == Status.FAILED or controller._fail)
147 if failed and not controller._unblock:
148 self._planner_rpc.run('modify_host', id=run['host']['id'],
jamesren3e9f6092010-03-11 21:32:10 +0000149 blocked=True)
150 self._planner_rpc.run('modify_test_run', id=run['id'],
151 finalized=True)
152
153
154 def _check_tko_jobs(self):
155 """
156 Instructs the server to update the Planner test runs table
157
158 Sends an RPC to have the server pull the proper TKO tests and add them
159 to the Planner tables. Logs information about what was added.
160 """
161 test_runs_updated = self._planner_rpc.run('update_test_runs',
162 plan_id=self._plan_id)
163 for update in test_runs_updated:
164 logging.info('added %s test run for tko test id %s (%s)',
165 update['status'], update['tko_test_idx'],
166 update['hostname'])
167
168
169 def _schedule_new_runs(self):
170 next_configs = self._planner_rpc.run('get_next_test_configs',
171 plan_id=self._plan_id)
172 if next_configs['complete']:
173 return True
174
175 for config in next_configs['next_configs']:
jamesrendbeebf82010-04-08 22:58:26 +0000176 config_id = config['next_test_config_id']
177 controller = support.TestPlanController(
178 machine=config['host'],
179 test_alias=config['next_test_config_alias'])
180 self._run_execute_before(controller)
181 if controller._skip:
182 self._planner_rpc.run('skip_test', test_config_id=config_id,
183 hostname=config['host'])
184 continue
185
jamesren3e9f6092010-03-11 21:32:10 +0000186 self._run_job(hostname=config['host'],
jamesrendbeebf82010-04-08 22:58:26 +0000187 test_config_id=config_id,
188 cleanup_before_job=controller._reboot_before,
189 cleanup_after_job=controller._reboot_after,
190 run_verify=controller._run_verify)
jamesren3e9f6092010-03-11 21:32:10 +0000191
192 return False
193
194
jamesrendbeebf82010-04-08 22:58:26 +0000195 def _run_job(self, hostname, test_config_id, cleanup_before_job,
196 cleanup_after_job, run_verify):
jamesren3e9f6092010-03-11 21:32:10 +0000197 test_config = self._planner_rpc.run('get_test_config',
198 id=test_config_id)
199
200 info = self._afe_rest.execution_info.get().execution_info
201 info['control_file'] = test_config['control_file']['contents']
202 info['is_server'] = test_config['is_server']
jamesrendbeebf82010-04-08 22:58:26 +0000203 info['cleanup_before_job'] = cleanup_before_job
204 info['cleanup_after_job'] = cleanup_after_job
205 info['run_verify'] = run_verify
jamesren3e9f6092010-03-11 21:32:10 +0000206
207 atomic_group_class = self._afe_rest.labels.get(
208 name=self._label_name).members[0].get().atomic_group_class.href
209
210 request = self._afe_rest.queue_entries_request.get(
211 hosts=(hostname,), atomic_group_class=atomic_group_class)
212 entries = request.queue_entries
213
214 plan = self._planner_rpc.run('get_plan', id=self._plan_id)
215 prefix = plan['label_override']
216 if prefix is None:
217 prefix = plan['name']
218 job_req = {'name' : '%s_%s_%s' % (prefix, test_config['alias'],
219 hostname),
220 'execution_info' : info,
221 'queue_entries' : entries}
222
223 logging.info('starting test alias %s for host %s',
224 test_config['alias'], hostname)
225 job = self._afe_rest.jobs.post(job_req)
226 self._planner_rpc.run('add_job',
227 plan_id=self._plan_id,
228 test_config_id=test_config_id,
229 afe_job_id=job.get().id)
jamesrendbeebf82010-04-08 22:58:26 +0000230
231
232 def _run_execute_before(self, controller):
233 """
234 Execute the global support's execute_before() for the plan
235 """
236 self._run_global_support(controller, 'execute_before')
237
238
239 def _run_execute_after(self, controller, tko_test_id, success):
240 """
241 Execute the global support's execute_after() for the plan
242 """
243 self._run_global_support(controller, 'execute_after',
244 tko_test_id=tko_test_id, success=success)
245
246
247 def _run_global_support(self, controller, function_name, **kwargs):
248 plan = self._planner_rpc.run('get_plan', id=self._plan_id)
249 if plan['support']:
250 context = {'model_attributes': afe_model_attributes}
251 exec plan['support'] in context
252 function = context.get(function_name)
253 if function:
254 if not callable(function):
255 raise Exception('Global support defines %s, but it is not '
256 'callable' % function_name)
257 function(controller, **kwargs)