Implementation of Test Planner execution engine. Is currently able to
schedule single-host tests and place the results in the proper planner
tables for analysis.
TODO: global support object, execution_engine.py unit tests
Signed-off-by: James Ren <jamesren@google.com>
git-svn-id: http://test.kernel.org/svn/autotest/trunk@4301 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/frontend/planner/execution_engine.py b/frontend/planner/execution_engine.py
index 30016c9..c0d1643 100644
--- a/frontend/planner/execution_engine.py
+++ b/frontend/planner/execution_engine.py
@@ -1,10 +1,26 @@
+import time, logging
+from autotest_lib.frontend.afe import model_attributes as afe_model_attributes
+from autotest_lib.frontend.shared import rest_client
+from autotest_lib.frontend.planner import model_attributes
+from autotest_lib.server import frontend
+
+
+TICK_INTERVAL_SECS = 10
+
class ExecutionEngine(object):
"""
Provides the Test Planner execution engine
"""
- def __init__(self, plan_id):
- self.plan_id = plan_id
+ _planner_rpc = frontend.Planner()
+ _tko_rpc = frontend.TKO()
+
+ def __init__(self, plan_id, server, label_name):
+ self._plan_id = plan_id
+ self._server = server
+ self._afe_rest = rest_client.Resource.load(
+ 'http://%s/afe/server/resources' % server)
+ self._label_name = label_name
def start(self):
@@ -13,4 +29,173 @@
Thread remains in this method until the execution engine is complete.
"""
- pass
+ self._initialize_plan()
+
+ while True:
+ if self._tick():
+ break
+ time.sleep(TICK_INTERVAL_SECS)
+
+ self._cleanup()
+
+
+ def _initialize_plan(self):
+ """
+ Performs actions necessary to start a test plan.
+
+ Adds the hosts into the proper atomic group, and waits for the plan to
+ be ready to start before returning
+ """
+ plan = self._planner_rpc.run('get_plan', id=self._plan_id)
+ name = plan['name'] + '_set_atomic_group'
+ if not self._afe_rest.jobs.get(name=name).total_results:
+ self._launch_set_atomic_group_job(name)
+
+ self._wait_for_initialization()
+
+
+ def _launch_set_atomic_group_job(self, name):
+ """
+ Launch the job to set the hosts' atomic group, and initate the plan
+
+ If the hosts are already part of an atomic group, wait for a tick and
+ try again. Return when successful
+ """
+ while True:
+ hosts = self._planner_rpc.run('get_hosts', plan_id=self._plan_id)
+ control = self._planner_rpc.run('get_atomic_group_control_file')
+
+ info = self._afe_rest.execution_info.get().execution_info
+ info['control_file'] = control
+ info['cleanup_before_job'] = afe_model_attributes.RebootBefore.NEVER
+ info['cleanup_after_job'] = afe_model_attributes.RebootAfter.NEVER
+ info['run_verify'] = False
+ info['machines_per_execution'] = len(hosts)
+
+ entries = self._afe_rest.queue_entries_request.get(
+ hosts=hosts).queue_entries
+
+ keyvals = {'server': self._server,
+ 'label_name': self._label_name,
+ 'plan_id': self._plan_id}
+
+ job_req = {'name' : name,
+ 'execution_info' : info,
+ 'queue_entries' : entries,
+ 'keyvals' : keyvals}
+
+ try:
+ self._afe_rest.jobs.post(job_req)
+ logging.info('created job to set atomic group')
+ break
+ except rest_client.ClientError, e:
+ logging.info('hosts already in atomic group')
+ logging.info('(error was %s)' % e.message)
+ logging.info('waiting...')
+ time.sleep(TICK_INTERVAL_SECS)
+
+
+ def _wait_for_initialization(self):
+ while True:
+ plan = self._planner_rpc.run('get_plan', id=self._plan_id)
+ if plan['initialized']:
+ break
+ logging.info('waiting for initialization...')
+ time.sleep(TICK_INTERVAL_SECS)
+
+
+ def _cleanup(self):
+ self._afe_rest.labels.get(name=self._label_name).members[0].delete()
+
+
+ def _tick(self):
+ """
+ Processes one tick of the execution engine.
+
+ Returns True if the engine has completed the plan.
+ """
+ logging.info('tick')
+ self._process_finished_runs()
+ self._check_tko_jobs()
+ return self._schedule_new_runs()
+
+
+ def _process_finished_runs(self):
+ """
+ Finalize the test runs that have finished.
+
+ Look for runs that are in PASSED or FAILED, perform any additional
+ processing required, and set the entry to 'finalized'.
+ """
+ Status = model_attributes.TestRunStatus
+ runs = self._planner_rpc.run('get_test_runs', plan__id=self._plan_id,
+ status__in=(Status.PASSED, Status.FAILED),
+ finalized=False)
+ for run in runs:
+ logging.info('finalizing test run %s', run)
+ if run['status'] == Status.FAILED:
+ self._planner_rpc.run('modify_host', id=run['host'],
+ blocked=True)
+ self._planner_rpc.run('modify_test_run', id=run['id'],
+ finalized=True)
+
+
+ def _check_tko_jobs(self):
+ """
+ Instructs the server to update the Planner test runs table
+
+ Sends an RPC to have the server pull the proper TKO tests and add them
+ to the Planner tables. Logs information about what was added.
+ """
+ test_runs_updated = self._planner_rpc.run('update_test_runs',
+ plan_id=self._plan_id)
+ for update in test_runs_updated:
+ logging.info('added %s test run for tko test id %s (%s)',
+ update['status'], update['tko_test_idx'],
+ update['hostname'])
+
+
+ def _schedule_new_runs(self):
+ next_configs = self._planner_rpc.run('get_next_test_configs',
+ plan_id=self._plan_id)
+ if next_configs['complete']:
+ return True
+
+ for config in next_configs['next_configs']:
+ self._run_job(hostname=config['host'],
+ test_config_id=config['next_test_config_id'])
+
+ return False
+
+
+ def _run_job(self, hostname, test_config_id):
+ test_config = self._planner_rpc.run('get_test_config',
+ id=test_config_id)
+
+ info = self._afe_rest.execution_info.get().execution_info
+ info['control_file'] = test_config['control_file']['contents']
+ info['is_server'] = test_config['is_server']
+
+ atomic_group_class = self._afe_rest.labels.get(
+ name=self._label_name).members[0].get().atomic_group_class.href
+
+ request = self._afe_rest.queue_entries_request.get(
+ hosts=(hostname,), atomic_group_class=atomic_group_class)
+ entries = request.queue_entries
+
+ plan = self._planner_rpc.run('get_plan', id=self._plan_id)
+ prefix = plan['label_override']
+ if prefix is None:
+ prefix = plan['name']
+ job_req = {'name' : '%s_%s_%s' % (prefix, test_config['alias'],
+ hostname),
+ 'execution_info' : info,
+ 'queue_entries' : entries}
+
+ logging.info('starting test alias %s for host %s',
+ test_config['alias'], hostname)
+ job = self._afe_rest.jobs.post(job_req)
+ self._planner_rpc.run('add_job',
+ plan_id=self._plan_id,
+ test_config_id=test_config_id,
+ afe_job_id=job.get().id)