Implementation of Test Planner execution engine. Is currently able to
schedule single-host tests and place the results in the proper planner
tables for analysis.

TODO: global support object, execution_engine.py unit tests

Signed-off-by: James Ren <jamesren@google.com>


git-svn-id: http://test.kernel.org/svn/autotest/trunk@4301 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/frontend/planner/rpc_interface.py b/frontend/planner/rpc_interface.py
index eaf4de9..219436f 100644
--- a/frontend/planner/rpc_interface.py
+++ b/frontend/planner/rpc_interface.py
@@ -11,6 +11,7 @@
 from autotest_lib.frontend import thread_local
 from autotest_lib.frontend.afe import model_logic, models as afe_models
 from autotest_lib.frontend.afe import rpc_utils as afe_rpc_utils
+from autotest_lib.frontend.tko import models as tko_models
 from autotest_lib.frontend.planner import models, rpc_utils
 from autotest_lib.client.common_lib import utils
 
@@ -26,6 +27,32 @@
     models.Plan.smart_get(id).update_object(data)
 
 
+def get_test_runs(**filter_data):
+    return afe_rpc_utils.prepare_for_serialization(
+            [test_run.get_object_dict() for test_run
+             in models.TestRun.objects.filter(**filter_data)])
+
+
+def modify_test_run(id, **data):
+    models.TestRun.objects.get(id=id).update_object(data)
+
+
+def modify_host(id, **data):
+    models.Host.objects.get(id=id).update_object(data)
+
+
+def get_test_config(id):
+    return afe_rpc_utils.prepare_rows_as_nested_dicts(
+            models.TestConfig.objects.filter(id=id), ('control_file',))[0]
+
+
+def add_job(plan_id, test_config_id, afe_job_id):
+    models.Job.objects.create(
+            plan=models.Plan.objects.get(id=plan_id),
+            test_config=models.TestConfig.objects.get(id=test_config_id),
+            afe_job=afe_models.Job.objects.get(id=afe_job_id))
+
+
 # more advanced calls
 
 def submit_plan(name, hosts, host_labels, tests,
@@ -37,7 +64,12 @@
     @param hosts: a list of hostnames
     @param host_labels: a list of host labels. The hosts under test will update
                         to reflect changes in the label
-    @param tests: a list of test control files to run
+    @param tests: an ordered list of dictionaries:
+                      alias: an alias for the test
+                      control_file: the test control file
+                      is_server: True if is a server-side control file
+                      estimated_runtime: estimated number of hours this test
+                                         will run
     @param support: the global support object
     @param label_override: label to prepend to all AFE jobs for this test plan.
                            Defaults to the plan name.
@@ -60,6 +92,21 @@
             raise model_logic.ValidationError(
                     {'host_labels': 'host label %s does not exist' % label})
 
+    aliases_seen = set()
+    test_required_fields = (
+            'alias', 'control_file', 'is_server', 'estimated_runtime')
+    for test in tests:
+        for field in test_required_fields:
+            if field not in test:
+                raise model_logic.ValidationError(
+                        {'tests': 'field %s is required' % field})
+
+        alias = test['alias']
+        if alias in aliases_seen:
+            raise model_logic.Validationerror(
+                    {'tests': 'alias %s occurs more than once' % alias})
+        aliases_seen.add(alias)
+
     plan, created = models.Plan.objects.get_or_create(name=name)
     if not created:
         raise model_logic.ValidationError(
@@ -67,25 +114,36 @@
 
     try:
         label = rpc_utils.create_plan_label(plan)
+        try:
+            for i, test in enumerate(tests):
+                control, _ = models.ControlFile.objects.get_or_create(
+                        contents=test['control_file'])
+                models.TestConfig.objects.create(
+                        plan=plan, alias=test['alias'], control_file=control,
+                        is_server=test['is_server'], execution_order=i,
+                        estimated_runtime=test['estimated_runtime'])
+
+            plan.label_override = label_override
+            plan.support = support or ''
+            plan.save()
+
+            plan.owners.add(afe_models.User.current_user())
+
+            for host in host_objects:
+                planner_host = models.Host.objects.create(plan=plan, host=host)
+
+            plan.host_labels.add(*label_objects)
+
+            rpc_utils.start_plan(plan, label)
+
+            return plan.id
+        except:
+            label.delete()
+            raise
     except:
         plan.delete()
         raise
 
-    plan.label_override = label_override
-    plan.support = support or ''
-    plan.save()
-
-    plan.owners.add(afe_models.User.current_user())
-
-    for host in host_objects:
-        planner_host = models.Host.objects.create(plan=plan, host=host)
-
-    plan.host_labels.add(*label_objects)
-
-    rpc_utils.start_plan(plan, label)
-
-    return plan.id
-
 
 def get_hosts(plan_id):
     """
@@ -108,3 +166,72 @@
     """
     return rpc_utils.lazy_load(os.path.join(os.path.dirname(__file__),
                                             'set_atomic_group_control.srv'))
+
+
+def get_next_test_configs(plan_id):
+    """
+    Gets information about the next planner test configs that need to be run
+
+    @param plan_id: the ID or name of the test plan
+    @return a dictionary:
+                complete: True or False, shows test plan completion
+                next_configs: a list of dictionaries:
+                    host: ID of the host
+                    next_test_config_id: ID of the next Planner test to run
+    """
+    plan = models.Plan.smart_get(plan_id)
+
+    result = {'next_configs': []}
+
+    rpc_utils.update_hosts_table(plan)
+    for host in models.Host.objects.filter(plan=plan):
+        next_test_config_id = rpc_utils.compute_next_test_config(plan, host)
+        if next_test_config_id:
+            config = {'next_test_config_id': next_test_config_id,
+                      'host': host.host.hostname}
+            result['next_configs'].append(config)
+
+    rpc_utils.check_for_completion(plan)
+    result['complete'] = plan.complete
+
+    return result
+
+
+def update_test_runs(plan_id):
+    """
+    Add all applicable TKO jobs to the Planner DB tables
+
+    Looks for tests in the TKO tables that were started as a part of the test
+    plan, and add them to the Planner tables.
+
+    Also updates the status of the test run if the underlying TKO test move from
+    an active status to a completed status.
+
+    @return a list of dictionaries:
+                status: the status of the new (or updated) test run
+                tko_test_idx: the ID of the TKO test added
+                hostname: the host added
+    """
+    plan = models.Plan.objects.get(id=plan_id)
+    updated = []
+
+    for planner_job in plan.job_set.all():
+        known_statuses = dict((test_run.tko_test.test_idx, test_run.status)
+                              for test_run in planner_job.testrun_set.all())
+        tko_tests_for_job = tko_models.Test.objects.filter(
+                job__afe_job_id=planner_job.afe_job.id)
+
+        for tko_test in tko_tests_for_job:
+            status = rpc_utils.compute_test_run_status(tko_test.status.word)
+            needs_update = (tko_test.test_idx not in known_statuses or
+                            status != known_statuses[tko_test.test_idx])
+            if needs_update:
+                hostnames = tko_test.machine.hostname.split(',')
+                for hostname in hostnames:
+                    rpc_utils.add_test_run(
+                            plan, planner_job, tko_test, hostname, status)
+                    updated.append({'status': status,
+                                    'tko_test_idx': tko_test.test_idx,
+                                    'hostname': hostname})
+
+    return updated