Implementation of Test Planner execution engine. Is currently able to
schedule single-host tests and place the results in the proper planner
tables for analysis.
TODO: global support object, execution_engine.py unit tests
Signed-off-by: James Ren <jamesren@google.com>
git-svn-id: http://test.kernel.org/svn/autotest/trunk@4301 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/frontend/planner/rpc_interface.py b/frontend/planner/rpc_interface.py
index eaf4de9..219436f 100644
--- a/frontend/planner/rpc_interface.py
+++ b/frontend/planner/rpc_interface.py
@@ -11,6 +11,7 @@
from autotest_lib.frontend import thread_local
from autotest_lib.frontend.afe import model_logic, models as afe_models
from autotest_lib.frontend.afe import rpc_utils as afe_rpc_utils
+from autotest_lib.frontend.tko import models as tko_models
from autotest_lib.frontend.planner import models, rpc_utils
from autotest_lib.client.common_lib import utils
@@ -26,6 +27,32 @@
models.Plan.smart_get(id).update_object(data)
+def get_test_runs(**filter_data):
+ return afe_rpc_utils.prepare_for_serialization(
+ [test_run.get_object_dict() for test_run
+ in models.TestRun.objects.filter(**filter_data)])
+
+
+def modify_test_run(id, **data):
+ models.TestRun.objects.get(id=id).update_object(data)
+
+
+def modify_host(id, **data):
+ models.Host.objects.get(id=id).update_object(data)
+
+
+def get_test_config(id):
+ return afe_rpc_utils.prepare_rows_as_nested_dicts(
+ models.TestConfig.objects.filter(id=id), ('control_file',))[0]
+
+
+def add_job(plan_id, test_config_id, afe_job_id):
+ models.Job.objects.create(
+ plan=models.Plan.objects.get(id=plan_id),
+ test_config=models.TestConfig.objects.get(id=test_config_id),
+ afe_job=afe_models.Job.objects.get(id=afe_job_id))
+
+
# more advanced calls
def submit_plan(name, hosts, host_labels, tests,
@@ -37,7 +64,12 @@
@param hosts: a list of hostnames
@param host_labels: a list of host labels. The hosts under test will update
to reflect changes in the label
- @param tests: a list of test control files to run
+ @param tests: an ordered list of dictionaries:
+ alias: an alias for the test
+ control_file: the test control file
+ is_server: True if is a server-side control file
+ estimated_runtime: estimated number of hours this test
+ will run
@param support: the global support object
@param label_override: label to prepend to all AFE jobs for this test plan.
Defaults to the plan name.
@@ -60,6 +92,21 @@
raise model_logic.ValidationError(
{'host_labels': 'host label %s does not exist' % label})
+ aliases_seen = set()
+ test_required_fields = (
+ 'alias', 'control_file', 'is_server', 'estimated_runtime')
+ for test in tests:
+ for field in test_required_fields:
+ if field not in test:
+ raise model_logic.ValidationError(
+ {'tests': 'field %s is required' % field})
+
+ alias = test['alias']
+ if alias in aliases_seen:
+ raise model_logic.Validationerror(
+ {'tests': 'alias %s occurs more than once' % alias})
+ aliases_seen.add(alias)
+
plan, created = models.Plan.objects.get_or_create(name=name)
if not created:
raise model_logic.ValidationError(
@@ -67,25 +114,36 @@
try:
label = rpc_utils.create_plan_label(plan)
+ try:
+ for i, test in enumerate(tests):
+ control, _ = models.ControlFile.objects.get_or_create(
+ contents=test['control_file'])
+ models.TestConfig.objects.create(
+ plan=plan, alias=test['alias'], control_file=control,
+ is_server=test['is_server'], execution_order=i,
+ estimated_runtime=test['estimated_runtime'])
+
+ plan.label_override = label_override
+ plan.support = support or ''
+ plan.save()
+
+ plan.owners.add(afe_models.User.current_user())
+
+ for host in host_objects:
+ planner_host = models.Host.objects.create(plan=plan, host=host)
+
+ plan.host_labels.add(*label_objects)
+
+ rpc_utils.start_plan(plan, label)
+
+ return plan.id
+ except:
+ label.delete()
+ raise
except:
plan.delete()
raise
- plan.label_override = label_override
- plan.support = support or ''
- plan.save()
-
- plan.owners.add(afe_models.User.current_user())
-
- for host in host_objects:
- planner_host = models.Host.objects.create(plan=plan, host=host)
-
- plan.host_labels.add(*label_objects)
-
- rpc_utils.start_plan(plan, label)
-
- return plan.id
-
def get_hosts(plan_id):
"""
@@ -108,3 +166,72 @@
"""
return rpc_utils.lazy_load(os.path.join(os.path.dirname(__file__),
'set_atomic_group_control.srv'))
+
+
+def get_next_test_configs(plan_id):
+ """
+ Gets information about the next planner test configs that need to be run
+
+ @param plan_id: the ID or name of the test plan
+ @return a dictionary:
+ complete: True or False, shows test plan completion
+ next_configs: a list of dictionaries:
+ host: ID of the host
+ next_test_config_id: ID of the next Planner test to run
+ """
+ plan = models.Plan.smart_get(plan_id)
+
+ result = {'next_configs': []}
+
+ rpc_utils.update_hosts_table(plan)
+ for host in models.Host.objects.filter(plan=plan):
+ next_test_config_id = rpc_utils.compute_next_test_config(plan, host)
+ if next_test_config_id:
+ config = {'next_test_config_id': next_test_config_id,
+ 'host': host.host.hostname}
+ result['next_configs'].append(config)
+
+ rpc_utils.check_for_completion(plan)
+ result['complete'] = plan.complete
+
+ return result
+
+
+def update_test_runs(plan_id):
+ """
+ Add all applicable TKO jobs to the Planner DB tables
+
+ Looks for tests in the TKO tables that were started as a part of the test
+ plan, and add them to the Planner tables.
+
+ Also updates the status of the test run if the underlying TKO test move from
+ an active status to a completed status.
+
+ @return a list of dictionaries:
+ status: the status of the new (or updated) test run
+ tko_test_idx: the ID of the TKO test added
+ hostname: the host added
+ """
+ plan = models.Plan.objects.get(id=plan_id)
+ updated = []
+
+ for planner_job in plan.job_set.all():
+ known_statuses = dict((test_run.tko_test.test_idx, test_run.status)
+ for test_run in planner_job.testrun_set.all())
+ tko_tests_for_job = tko_models.Test.objects.filter(
+ job__afe_job_id=planner_job.afe_job.id)
+
+ for tko_test in tko_tests_for_job:
+ status = rpc_utils.compute_test_run_status(tko_test.status.word)
+ needs_update = (tko_test.test_idx not in known_statuses or
+ status != known_statuses[tko_test.test_idx])
+ if needs_update:
+ hostnames = tko_test.machine.hostname.split(',')
+ for hostname in hostnames:
+ rpc_utils.add_test_run(
+ plan, planner_job, tko_test, hostname, status)
+ updated.append({'status': status,
+ 'tko_test_idx': tko_test.test_idx,
+ 'hostname': hostname})
+
+ return updated