Add framework for Test Planner execution engine, and the supporting RPC
interfaces

Signed-off-by: James Ren <jamesren@google.com>


git-svn-id: http://test.kernel.org/svn/autotest/trunk@4260 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/frontend/afe/resources.py b/frontend/afe/resources.py
index 835cc83..bc26e06 100644
--- a/frontend/afe/resources.py
+++ b/frontend/afe/resources.py
@@ -587,6 +587,7 @@
     @classmethod
     def add_query_selectors(cls, query_processor):
         query_processor.add_field_selector('id')
+        query_processor.add_field_selector('name')
         query_processor.add_selector(
                 query_lib.Selector('status',
                                    doc='One of queued, active or complete'),
diff --git a/frontend/migrations/049_test_planner_additions.py b/frontend/migrations/049_test_planner_additions.py
new file mode 100644
index 0000000..21fc8ea
--- /dev/null
+++ b/frontend/migrations/049_test_planner_additions.py
@@ -0,0 +1,36 @@
+UP_SQL = """\
+BEGIN;
+
+SET storage_engine = InnoDB;
+
+CREATE TABLE `planner_plan_host_labels` (
+    `id` integer AUTO_INCREMENT NOT NULL PRIMARY KEY,
+    `plan_id` integer NOT NULL,
+    `label_id` integer NOT NULL
+)
+;
+ALTER TABLE `planner_plan_host_labels` ADD CONSTRAINT plan_host_labels_plan_id_fk FOREIGN KEY (`plan_id`) REFERENCES `planner_plans` (`id`);
+ALTER TABLE `planner_plan_host_labels` ADD CONSTRAINT plan_host_labels_label_id_fk FOREIGN KEY (`label_id`) REFERENCES `afe_labels` (`id`);
+
+
+ALTER TABLE `planner_tests` ADD COLUMN `alias` varchar(255) NOT NULL;
+ALTER TABLE `planner_tests` ADD CONSTRAINT `tests_plan_id_alias_unique` UNIQUE KEY (`plan_id`, `alias`);
+
+
+ALTER TABLE `planner_tests` ADD COLUMN `estimated_runtime` int NOT NULL;
+
+
+ALTER TABLE `planner_test_runs` ADD COLUMN `host_id` int NOT NULL;
+ALTER TABLE `planner_test_runs` ADD CONSTRAINT `test_runs_host_id_fk` FOREIGN KEY (`host_id`) REFERENCES `planner_hosts` (`id`);
+
+COMMIT;
+"""
+
+DOWN_SQL = """\
+ALTER TABLE `planner_tests` DROP KEY `tests_plan_id_alias_unique`;
+ALTER TABLE `planner_tests` DROP COLUMN `alias`;
+ALTER TABLE `planner_tests` DROP COLUMN `estimated_runtime`;
+ALTER TABLE `planner_test_runs` DROP FOREIGN KEY `test_runs_host_id_fk`;
+ALTER TABLE `planner_test_runs` DROP COLUMN `host_id`;
+DROP TABLE IF EXISTS `planner_plan_host_labels`;
+"""
diff --git a/frontend/planner/execution_engine.py b/frontend/planner/execution_engine.py
new file mode 100644
index 0000000..30016c9
--- /dev/null
+++ b/frontend/planner/execution_engine.py
@@ -0,0 +1,16 @@
+class ExecutionEngine(object):
+    """
+    Provides the Test Planner execution engine
+    """
+
+    def __init__(self, plan_id):
+        self.plan_id = plan_id
+
+
+    def start(self):
+        """
+        Starts the execution engine.
+
+        Thread remains in this method until the execution engine is complete.
+        """
+        pass
diff --git a/frontend/planner/execution_engine_control.srv b/frontend/planner/execution_engine_control.srv
new file mode 100644
index 0000000..d4bb615
--- /dev/null
+++ b/frontend/planner/execution_engine_control.srv
@@ -0,0 +1,61 @@
+import time
+from autotest_lib.frontend import setup_django_environment
+from autotest_lib.client.common_lib import utils
+from autotest_lib.frontend.afe import models as afe_models
+from autotest_lib.frontend.shared import rest_client
+from autotest_lib.frontend.planner import execution_engine
+from autotest_lib.server import frontend
+
+
+TICK_INTERVAL_SECS = 10
+
+keyvals = utils.read_keyval(job.resultdir)
+
+planner_rpc = frontend.Planner()
+afe_rest = rest_client.Resource.load(
+        'http://%s/afe/server/resources' % keyvals['server'])
+
+
+def _launch_set_atomic_group_job(plan, name):
+    """Launch the job to set the hosts' atomic group, and initate the plan"""
+    hosts = planner_rpc.run('get_hosts', plan_id=keyvals['plan_id'])
+    control = planner_rpc.run('get_atomic_group_control_file')
+
+    info = afe_rest.execution_info.get().execution_info
+    info['control_file'] = control
+    info['cleanup_before_job'] = afe_models.RebootBefore.NEVER
+    info['cleanup_after_job'] = afe_models.RebootAfter.NEVER
+    info['run_verify'] = False
+    info['machines_per_execution'] = len(hosts)
+
+    entries = afe_rest.queue_entries_request.get(hosts=hosts).queue_entries
+
+    job_req = {'name' : name,
+               'execution_info' : info,
+               'queue_entries' : entries,
+               'keyvals' : keyvals}
+
+    afe_rest.jobs.post(job_req)
+
+
+# Check if the plan is already being initialized, and launch the initialization
+# job if not
+plan = planner_rpc.run('get_plan', id=keyvals['plan_id'])
+name = plan['name'] + '_set_atomic_group'
+if not afe_rest.jobs.get(name=name).total_results:
+    _launch_set_atomic_group_job(plan, name)
+
+
+# Wait for the plan to be initialized
+while True:
+    if planner_rpc.run('get_plan', id=keyvals['plan_id'])['initialized']:
+        break
+    time.sleep(TICK_INTERVAL_SECS)
+
+
+# Execution engine main loop
+execution_engine.ExecutionEngine(plan_id=keyvals['plan_id']).start()
+
+
+# Cleanup
+afe_rest.labels.get(name=keyvals['label_name']).members[0].delete()
diff --git a/frontend/planner/models.py b/frontend/planner/models.py
index 7f62471..2236b11 100644
--- a/frontend/planner/models.py
+++ b/frontend/planner/models.py
@@ -6,7 +6,7 @@
 from autotest_lib.client.common_lib import enum, utils
 
 
-class Plan(dbmodels.Model):
+class Plan(dbmodels.Model, model_logic.ModelExtensions):
     """A test plan
 
     Required:
@@ -30,6 +30,10 @@
     owners = dbmodels.ManyToManyField(afe_models.User,
                                       db_table='planner_plan_owners')
     hosts = dbmodels.ManyToManyField(afe_models.Host, through='Host')
+    host_labels = dbmodels.ManyToManyField(afe_models.Label,
+                                           db_table='planner_plan_host_labels')
+
+    name_field = 'name'
 
     class Meta:
         db_table = 'planner_plans'
@@ -64,7 +68,7 @@
                 'Subclasses must override _get_details_unicode()')
 
 
-class Host(ModelWithPlan):
+class Host(ModelWithPlan, model_logic.ModelExtensions):
     """A plan host
 
     Required:
@@ -76,14 +80,29 @@
     complete = dbmodels.BooleanField(default=False)
     blocked = dbmodels.BooleanField(default=False)
 
+    Status = enum.Enum('Finished', 'Running', 'Blocked', string_values=True)
+
     class Meta:
         db_table = 'planner_hosts'
 
 
+    def status(self):
+        if self.complete:
+            return Host.Status.FINISHED
+        if self.blocked:
+            return Host.Status.BLOCKED
+        return Host.Status.RUNNING
+
+
     def _get_details_unicode(self):
         return 'Host: %s' % host.hostname
 
 
+    @classmethod
+    def smart_get(cls, id):
+        raise NotImplementedError('Planner hosts do not support smart_get()')
+
+
 class ControlFile(model_logic.ModelWithHash):
     """A control file. Immutable once added to the table
 
@@ -113,16 +132,23 @@
     """A planned test
 
     Required:
+        alias: The name to give this test within the plan. Unique with plan id
         test_control_file: The control file to run
         execution_order: An integer describing when this test should be run in
                          the test plan
+        estimated_runtime: Time in hours that the test is expected to run. Will
+                           be automatically generated (on the frontend) for
+                           tests in Autotest.
     """
+    alias = dbmodels.CharField(max_length=255)
     control_file = dbmodels.ForeignKey(ControlFile)
     execution_order = dbmodels.IntegerField(blank=True)
+    estimated_runtime = dbmodels.IntegerField()
 
     class Meta:
         db_table = 'planner_tests'
         ordering = ('execution_order',)
+        unique_together = (('plan', 'alias'),)
 
 
     def _get_details_unicode(self):
@@ -187,6 +213,7 @@
 
     test_job = dbmodels.ForeignKey(Job)
     tko_test = dbmodels.ForeignKey(tko_models.Test)
+    host = dbmodels.ForeignKey(Host)
     status = dbmodels.CharField(max_length=16, choices=Status.choices())
     finalized = dbmodels.BooleanField(default=False)
     seen = dbmodels.BooleanField(default=False)
diff --git a/frontend/planner/planner_test_utils.py b/frontend/planner/planner_test_utils.py
new file mode 100644
index 0000000..6aa2d3b
--- /dev/null
+++ b/frontend/planner/planner_test_utils.py
@@ -0,0 +1,26 @@
+import common
+from autotest_lib.frontend.afe import frontend_test_utils
+from autotest_lib.frontend.afe import models as afe_models
+from autotest_lib.frontend.planner import models
+from autotest_lib.client.common_lib import utils
+
+class PlannerTestMixin(frontend_test_utils.FrontendTestMixin):
+    _PLAN_NAME = 'plan'
+
+    def _planner_common_setup(self):
+        self._frontend_common_setup()
+
+        plan = models.Plan.objects.create(name=self._PLAN_NAME)
+        models.Host.objects.create(
+                plan=plan, host=afe_models.Host.objects.get(hostname='host1'))
+        models.Host.objects.create(
+                plan=plan, host=afe_models.Host.objects.get(hostname='host2'))
+        plan.host_labels.add(afe_models.Label.objects.get(name='label1'))
+        plan.save()
+
+        self._plan = plan
+
+
+    def _planner_common_teardown(self):
+        self._plan.delete()
+        self._frontend_common_teardown()
diff --git a/frontend/planner/rpc_interface.py b/frontend/planner/rpc_interface.py
index b8ea992..eaf4de9 100644
--- a/frontend/planner/rpc_interface.py
+++ b/frontend/planner/rpc_interface.py
@@ -5,4 +5,106 @@
 __author__ = 'jamesren@google.com (James Ren)'
 
 
-# Nothing yet
+import os
+import common
+from django.db import models as django_models
+from autotest_lib.frontend import thread_local
+from autotest_lib.frontend.afe import model_logic, models as afe_models
+from autotest_lib.frontend.afe import rpc_utils as afe_rpc_utils
+from autotest_lib.frontend.planner import models, rpc_utils
+from autotest_lib.client.common_lib import utils
+
+# basic getter/setter calls
+# TODO: deprecate the basic calls and reimplement them in the REST framework
+
+def get_plan(id):
+    return afe_rpc_utils.prepare_for_serialization(
+            models.Plan.smart_get(id).get_object_dict())
+
+
+def modify_plan(id, **data):
+    models.Plan.smart_get(id).update_object(data)
+
+
+# more advanced calls
+
+def submit_plan(name, hosts, host_labels, tests,
+                support=None, label_override=None):
+    """
+    Submits a plan to the Test Planner
+
+    @param name: the name of the plan
+    @param hosts: a list of hostnames
+    @param host_labels: a list of host labels. The hosts under test will update
+                        to reflect changes in the label
+    @param tests: a list of test control files to run
+    @param support: the global support object
+    @param label_override: label to prepend to all AFE jobs for this test plan.
+                           Defaults to the plan name.
+    """
+    host_objects = []
+    label_objects = []
+
+    for host in hosts or []:
+        try:
+            host_objects.append(
+                    afe_models.Host.valid_objects.get(hostname=host))
+        except afe_models.Host.DoesNotExist:
+            raise model_logic.ValidationError(
+                    {'hosts': 'host %s does not exist' % host})
+
+    for label in host_labels or []:
+        try:
+            label_objects.append(afe_models.Label.valid_objects.get(name=label))
+        except afe_models.Label.DoesNotExist:
+            raise model_logic.ValidationError(
+                    {'host_labels': 'host label %s does not exist' % label})
+
+    plan, created = models.Plan.objects.get_or_create(name=name)
+    if not created:
+        raise model_logic.ValidationError(
+                {'name': 'Plan name %s already exists' % name})
+
+    try:
+        label = rpc_utils.create_plan_label(plan)
+    except:
+        plan.delete()
+        raise
+
+    plan.label_override = label_override
+    plan.support = support or ''
+    plan.save()
+
+    plan.owners.add(afe_models.User.current_user())
+
+    for host in host_objects:
+        planner_host = models.Host.objects.create(plan=plan, host=host)
+
+    plan.host_labels.add(*label_objects)
+
+    rpc_utils.start_plan(plan, label)
+
+    return plan.id
+
+
+def get_hosts(plan_id):
+    """
+    Gets the hostnames of all the hosts in this test plan.
+
+    Resolves host labels in the plan.
+    """
+    plan = models.Plan.smart_get(plan_id)
+
+    hosts = set(plan.hosts.all().values_list('hostname', flat=True))
+    for label in plan.host_labels.all():
+        hosts.update(label.host_set.all().values_list('hostname', flat=True))
+
+    return afe_rpc_utils.prepare_for_serialization(hosts)
+
+
+def get_atomic_group_control_file():
+    """
+    Gets the control file to apply the atomic group for a set of machines
+    """
+    return rpc_utils.lazy_load(os.path.join(os.path.dirname(__file__),
+                                            'set_atomic_group_control.srv'))
diff --git a/frontend/planner/rpc_interface_unittest.py b/frontend/planner/rpc_interface_unittest.py
new file mode 100644
index 0000000..0f1c71b
--- /dev/null
+++ b/frontend/planner/rpc_interface_unittest.py
@@ -0,0 +1,76 @@
+#!/usr/bin/python
+
+import unittest
+import common
+from autotest_lib.frontend import setup_django_environment
+from autotest_lib.frontend.planner import planner_test_utils
+from autotest_lib.frontend.afe import model_logic
+from autotest_lib.frontend.afe import models as afe_models
+from autotest_lib.frontend.planner import rpc_interface, models, rpc_utils
+
+
+class RpcInterfaceTest(unittest.TestCase,
+                       planner_test_utils.PlannerTestMixin):
+    def setUp(self):
+        self._planner_common_setup()
+        self.god.stub_function(rpc_utils, 'start_plan')
+
+
+    def tearDown(self):
+        self._planner_common_teardown()
+
+
+    def test_submit_plan_success(self):
+        hosts = ('host1', 'host2')
+        plan_name = self._PLAN_NAME + '2'
+
+        rpc_utils.start_plan.expect_any_call()
+        rpc_interface.submit_plan(plan_name, hosts, ('label1',), ())
+
+        plan = models.Plan.objects.get(name=plan_name)
+        self.assertEqual(
+                set(afe_models.Host.objects.filter(hostname__in=hosts)),
+                set(plan.hosts.all()))
+
+        self.assertEqual(1, plan.host_labels.all().count())
+        self.assertEqual(afe_models.Label.objects.get(name='label1'),
+                         plan.host_labels.all()[0])
+        self.god.check_playback()
+
+
+    def test_submit_plan_duplicate(self):
+        self.assertRaises(
+                model_logic.ValidationError, rpc_interface.submit_plan,
+                self._PLAN_NAME, (), (), ())
+
+
+    def test_submit_plan_bad_host(self):
+        self.assertRaises(
+                model_logic.ValidationError, rpc_interface.submit_plan,
+                self._PLAN_NAME + '2', ('fakehost'), (), ())
+
+
+    def test_submit_plan_bad_label(self):
+        self.assertRaises(
+                model_logic.ValidationError, rpc_interface.submit_plan,
+                self._PLAN_NAME + '2', (), ('fakelabel'), ())
+
+
+    def test_get_hosts(self):
+        hosts = rpc_interface.get_hosts(self._PLAN_NAME)
+        self.assertEqual(set(('host1', 'host2')), set(hosts))
+
+        afe_models.Host.objects.get(hostname='host3').labels.add(
+                afe_models.Label.objects.get(name='label1'))
+
+        hosts = rpc_interface.get_hosts(self._PLAN_NAME)
+        self.assertEqual(set(('host1', 'host2', 'host3')), set(hosts))
+
+        afe_models.Host.objects.get(hostname='host3').labels.clear()
+
+        hosts = rpc_interface.get_hosts(self._PLAN_NAME)
+        self.assertEqual(set(('host1', 'host2')), set(hosts))
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/frontend/planner/rpc_utils.py b/frontend/planner/rpc_utils.py
new file mode 100644
index 0000000..8d359bd
--- /dev/null
+++ b/frontend/planner/rpc_utils.py
@@ -0,0 +1,73 @@
+import common
+import os
+from autotest_lib.frontend.afe import models as afe_models, model_logic
+from autotest_lib.frontend.planner import models
+from autotest_lib.frontend.shared import rest_client
+from autotest_lib.client.common_lib import global_config, utils
+
+
+PLANNER_LABEL_PREFIX = 'planner_'
+PLANNER_ATOMIC_GROUP_NAME = 'planner_global_atomic_group'
+SERVER = global_config.global_config.get_config_value('SERVER', 'hostname')
+LAZY_LOADED_FILES = {}
+
+
+def create_plan_label(plan):
+    """
+    Creates the host label to apply on the plan hosts
+    """
+    group, _ = afe_models.AtomicGroup.objects.get_or_create(
+            name=PLANNER_ATOMIC_GROUP_NAME)
+    if group.invalid:
+        group.invalid = False
+        group.save()
+
+    name = PLANNER_LABEL_PREFIX + plan.name
+    if bool(afe_models.Label.valid_objects.filter(name=name)):
+        raise model_logic.ValidationError('Label %s already exists, '
+                                          'cannot start plan' % name)
+    label = afe_models.Label(name=name, atomic_group=group)
+    label.save()
+
+    return label
+
+
+def start_plan(plan, label):
+    """
+    Takes the necessary steps to start a test plan in Autotest
+    """
+    afe_rest = rest_client.Resource.load(
+            'http://%s/afe/server/resources' % SERVER)
+
+    keyvals = {'server': SERVER,
+               'plan_id': plan.id,
+               'label_name': label.name}
+
+    info = afe_rest.execution_info.get().execution_info
+    info['control_file'] = _get_execution_engine_control()
+    info['machines_per_execution'] = None
+
+    job_req = {'name': plan.name + '_execution_engine',
+               'execution_info': info,
+               'queue_entries': (),
+               'keyvals': keyvals}
+
+    afe_rest.jobs.post(job_req)
+
+
+def _get_execution_engine_control():
+    """
+    Gets the control file to run the execution engine
+    """
+    return lazy_load(os.path.join(os.path.dirname(__file__),
+                                  'execution_engine_control.srv'))
+
+
+def lazy_load(path):
+    """
+    Lazily loads the file indicated by the path given, and caches the result
+    """
+    if path not in LAZY_LOADED_FILES:
+        LAZY_LOADED_FILES[path] = utils.read_file(path)
+
+    return LAZY_LOADED_FILES[path]
diff --git a/frontend/planner/rpc_utils_unittest.py b/frontend/planner/rpc_utils_unittest.py
new file mode 100644
index 0000000..d168c05
--- /dev/null
+++ b/frontend/planner/rpc_utils_unittest.py
@@ -0,0 +1,69 @@
+#!/usr/bin/python
+
+import unittest
+import common
+from autotest_lib.frontend import setup_django_environment
+from autotest_lib.frontend.planner import planner_test_utils
+from autotest_lib.frontend.afe import model_logic, models as afe_models
+from autotest_lib.frontend.afe import rpc_interface as afe_rpc_interface
+from autotest_lib.frontend.planner import models, rpc_utils
+from autotest_lib.client.common_lib import utils
+
+
+class RpcUtilsTest(unittest.TestCase,
+                   planner_test_utils.PlannerTestMixin):
+    def setUp(self):
+        self._planner_common_setup()
+
+
+    def tearDown(self):
+        self._planner_common_teardown()
+
+
+    def test_create_plan_label(self):
+        label, group = self._create_label_helper()
+
+        label.delete()
+        group.invalid = True
+        group.save()
+
+        label, group = self._create_label_helper()
+
+        self.assertRaises(model_logic.ValidationError,
+                          rpc_utils.create_plan_label, self._plan)
+
+
+    def _create_label_helper(self):
+        label = rpc_utils.create_plan_label(self._plan)
+        group = afe_models.AtomicGroup.objects.get(
+                name=rpc_utils.PLANNER_ATOMIC_GROUP_NAME)
+        self.assertFalse(group.invalid)
+        self.assertEqual(label.atomic_group, group)
+
+        return (label, group)
+
+
+    def test_lazy_load(self):
+        self.god.stub_function(utils, 'read_file')
+
+        DUMMY_PATH_1 = object()
+        DUMMY_PATH_2 = object()
+        DUMMY_FILE_1 = object()
+        DUMMY_FILE_2 = object()
+
+        utils.read_file.expect_call(DUMMY_PATH_1).and_return(DUMMY_FILE_1)
+        self.assertEqual(DUMMY_FILE_1, rpc_utils.lazy_load(DUMMY_PATH_1))
+        self.god.check_playback()
+
+        # read_file should not be called again for this path
+        self.assertEqual(DUMMY_FILE_1, rpc_utils.lazy_load(DUMMY_PATH_1))
+        self.god.check_playback()
+
+        # new file; read_file must be called again
+        utils.read_file.expect_call(DUMMY_PATH_2).and_return(DUMMY_FILE_2)
+        self.assertEqual(DUMMY_FILE_2, rpc_utils.lazy_load(DUMMY_PATH_2))
+        self.god.check_playback()
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/frontend/planner/set_atomic_group_control.srv b/frontend/planner/set_atomic_group_control.srv
new file mode 100644
index 0000000..9d177c8
--- /dev/null
+++ b/frontend/planner/set_atomic_group_control.srv
@@ -0,0 +1,19 @@
+from autotest_lib.client.common_lib import utils
+from autotest_lib.frontend.shared import rest_client
+from autotest_lib.server import frontend
+
+keyvals = utils.read_keyval(job.resultdir)
+
+planner_rpc = frontend.Planner()
+afe_rest = rest_client.Resource.load(
+        'http://%s/afe/server/resources' % keyvals['server'])
+
+
+label = afe_rest.labels.get(name=keyvals['label_name']).members[0].get()
+
+for machine in machines:
+    hostname = hosts.create_host(machine).hostname
+    host = afe_rest.hosts.get(hostname=hostname).members[0]
+    label.hosts.post({'host': host})
+
+planner_rpc.run('modify_plan', id=keyvals['plan_id'], initialized=True)