blob: c0d164381f173e027c4c0f26b6eef631b7507317 [file] [log] [blame]
jamesren3e9f6092010-03-11 21:32:10 +00001import time, logging
2from autotest_lib.frontend.afe import model_attributes as afe_model_attributes
3from autotest_lib.frontend.shared import rest_client
4from autotest_lib.frontend.planner import model_attributes
5from autotest_lib.server import frontend
6
7
8TICK_INTERVAL_SECS = 10
9
jamesrenc3940222010-02-19 21:57:37 +000010class ExecutionEngine(object):
11 """
12 Provides the Test Planner execution engine
13 """
14
jamesren3e9f6092010-03-11 21:32:10 +000015 _planner_rpc = frontend.Planner()
16 _tko_rpc = frontend.TKO()
17
18 def __init__(self, plan_id, server, label_name):
19 self._plan_id = plan_id
20 self._server = server
21 self._afe_rest = rest_client.Resource.load(
22 'http://%s/afe/server/resources' % server)
23 self._label_name = label_name
jamesrenc3940222010-02-19 21:57:37 +000024
25
26 def start(self):
27 """
28 Starts the execution engine.
29
30 Thread remains in this method until the execution engine is complete.
31 """
jamesren3e9f6092010-03-11 21:32:10 +000032 self._initialize_plan()
33
34 while True:
35 if self._tick():
36 break
37 time.sleep(TICK_INTERVAL_SECS)
38
39 self._cleanup()
40
41
42 def _initialize_plan(self):
43 """
44 Performs actions necessary to start a test plan.
45
46 Adds the hosts into the proper atomic group, and waits for the plan to
47 be ready to start before returning
48 """
49 plan = self._planner_rpc.run('get_plan', id=self._plan_id)
50 name = plan['name'] + '_set_atomic_group'
51 if not self._afe_rest.jobs.get(name=name).total_results:
52 self._launch_set_atomic_group_job(name)
53
54 self._wait_for_initialization()
55
56
57 def _launch_set_atomic_group_job(self, name):
58 """
59 Launch the job to set the hosts' atomic group, and initate the plan
60
61 If the hosts are already part of an atomic group, wait for a tick and
62 try again. Return when successful
63 """
64 while True:
65 hosts = self._planner_rpc.run('get_hosts', plan_id=self._plan_id)
66 control = self._planner_rpc.run('get_atomic_group_control_file')
67
68 info = self._afe_rest.execution_info.get().execution_info
69 info['control_file'] = control
70 info['cleanup_before_job'] = afe_model_attributes.RebootBefore.NEVER
71 info['cleanup_after_job'] = afe_model_attributes.RebootAfter.NEVER
72 info['run_verify'] = False
73 info['machines_per_execution'] = len(hosts)
74
75 entries = self._afe_rest.queue_entries_request.get(
76 hosts=hosts).queue_entries
77
78 keyvals = {'server': self._server,
79 'label_name': self._label_name,
80 'plan_id': self._plan_id}
81
82 job_req = {'name' : name,
83 'execution_info' : info,
84 'queue_entries' : entries,
85 'keyvals' : keyvals}
86
87 try:
88 self._afe_rest.jobs.post(job_req)
89 logging.info('created job to set atomic group')
90 break
91 except rest_client.ClientError, e:
92 logging.info('hosts already in atomic group')
93 logging.info('(error was %s)' % e.message)
94 logging.info('waiting...')
95 time.sleep(TICK_INTERVAL_SECS)
96
97
98 def _wait_for_initialization(self):
99 while True:
100 plan = self._planner_rpc.run('get_plan', id=self._plan_id)
101 if plan['initialized']:
102 break
103 logging.info('waiting for initialization...')
104 time.sleep(TICK_INTERVAL_SECS)
105
106
107 def _cleanup(self):
108 self._afe_rest.labels.get(name=self._label_name).members[0].delete()
109
110
111 def _tick(self):
112 """
113 Processes one tick of the execution engine.
114
115 Returns True if the engine has completed the plan.
116 """
117 logging.info('tick')
118 self._process_finished_runs()
119 self._check_tko_jobs()
120 return self._schedule_new_runs()
121
122
123 def _process_finished_runs(self):
124 """
125 Finalize the test runs that have finished.
126
127 Look for runs that are in PASSED or FAILED, perform any additional
128 processing required, and set the entry to 'finalized'.
129 """
130 Status = model_attributes.TestRunStatus
131 runs = self._planner_rpc.run('get_test_runs', plan__id=self._plan_id,
132 status__in=(Status.PASSED, Status.FAILED),
133 finalized=False)
134 for run in runs:
135 logging.info('finalizing test run %s', run)
136 if run['status'] == Status.FAILED:
137 self._planner_rpc.run('modify_host', id=run['host'],
138 blocked=True)
139 self._planner_rpc.run('modify_test_run', id=run['id'],
140 finalized=True)
141
142
143 def _check_tko_jobs(self):
144 """
145 Instructs the server to update the Planner test runs table
146
147 Sends an RPC to have the server pull the proper TKO tests and add them
148 to the Planner tables. Logs information about what was added.
149 """
150 test_runs_updated = self._planner_rpc.run('update_test_runs',
151 plan_id=self._plan_id)
152 for update in test_runs_updated:
153 logging.info('added %s test run for tko test id %s (%s)',
154 update['status'], update['tko_test_idx'],
155 update['hostname'])
156
157
158 def _schedule_new_runs(self):
159 next_configs = self._planner_rpc.run('get_next_test_configs',
160 plan_id=self._plan_id)
161 if next_configs['complete']:
162 return True
163
164 for config in next_configs['next_configs']:
165 self._run_job(hostname=config['host'],
166 test_config_id=config['next_test_config_id'])
167
168 return False
169
170
171 def _run_job(self, hostname, test_config_id):
172 test_config = self._planner_rpc.run('get_test_config',
173 id=test_config_id)
174
175 info = self._afe_rest.execution_info.get().execution_info
176 info['control_file'] = test_config['control_file']['contents']
177 info['is_server'] = test_config['is_server']
178
179 atomic_group_class = self._afe_rest.labels.get(
180 name=self._label_name).members[0].get().atomic_group_class.href
181
182 request = self._afe_rest.queue_entries_request.get(
183 hosts=(hostname,), atomic_group_class=atomic_group_class)
184 entries = request.queue_entries
185
186 plan = self._planner_rpc.run('get_plan', id=self._plan_id)
187 prefix = plan['label_override']
188 if prefix is None:
189 prefix = plan['name']
190 job_req = {'name' : '%s_%s_%s' % (prefix, test_config['alias'],
191 hostname),
192 'execution_info' : info,
193 'queue_entries' : entries}
194
195 logging.info('starting test alias %s for host %s',
196 test_config['alias'], hostname)
197 job = self._afe_rest.jobs.post(job_req)
198 self._planner_rpc.run('add_job',
199 plan_id=self._plan_id,
200 test_config_id=test_config_id,
201 afe_job_id=job.get().id)