Add framework for Test Planner execution engine, and the supporting RPC
interfaces
Signed-off-by: James Ren <jamesren@google.com>
git-svn-id: http://test.kernel.org/svn/autotest/trunk@4260 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/frontend/afe/resources.py b/frontend/afe/resources.py
index 835cc83..bc26e06 100644
--- a/frontend/afe/resources.py
+++ b/frontend/afe/resources.py
@@ -587,6 +587,7 @@
@classmethod
def add_query_selectors(cls, query_processor):
query_processor.add_field_selector('id')
+ query_processor.add_field_selector('name')
query_processor.add_selector(
query_lib.Selector('status',
doc='One of queued, active or complete'),
diff --git a/frontend/migrations/049_test_planner_additions.py b/frontend/migrations/049_test_planner_additions.py
new file mode 100644
index 0000000..21fc8ea
--- /dev/null
+++ b/frontend/migrations/049_test_planner_additions.py
@@ -0,0 +1,36 @@
+UP_SQL = """\
+BEGIN;
+
+SET storage_engine = InnoDB;
+
+CREATE TABLE `planner_plan_host_labels` (
+ `id` integer AUTO_INCREMENT NOT NULL PRIMARY KEY,
+ `plan_id` integer NOT NULL,
+ `label_id` integer NOT NULL
+)
+;
+ALTER TABLE `planner_plan_host_labels` ADD CONSTRAINT plan_host_labels_plan_id_fk FOREIGN KEY (`plan_id`) REFERENCES `planner_plans` (`id`);
+ALTER TABLE `planner_plan_host_labels` ADD CONSTRAINT plan_host_labels_label_id_fk FOREIGN KEY (`label_id`) REFERENCES `afe_labels` (`id`);
+
+
+ALTER TABLE `planner_tests` ADD COLUMN `alias` varchar(255) NOT NULL;
+ALTER TABLE `planner_tests` ADD CONSTRAINT `tests_plan_id_alias_unique` UNIQUE KEY (`plan_id`, `alias`);
+
+
+ALTER TABLE `planner_tests` ADD COLUMN `estimated_runtime` int NOT NULL;
+
+
+ALTER TABLE `planner_test_runs` ADD COLUMN `host_id` int NOT NULL;
+ALTER TABLE `planner_test_runs` ADD CONSTRAINT `test_runs_host_id_fk` FOREIGN KEY (`host_id`) REFERENCES `planner_hosts` (`id`);
+
+COMMIT;
+"""
+
+DOWN_SQL = """\
+ALTER TABLE `planner_tests` DROP KEY `tests_plan_id_alias_unique`;
+ALTER TABLE `planner_tests` DROP COLUMN `alias`;
+ALTER TABLE `planner_tests` DROP COLUMN `estimated_runtime`;
+ALTER TABLE `planner_test_runs` DROP FOREIGN KEY `test_runs_host_id_fk`;
+ALTER TABLE `planner_test_runs` DROP COLUMN `host_id`;
+DROP TABLE IF EXISTS `planner_plan_host_labels`;
+"""
diff --git a/frontend/planner/execution_engine.py b/frontend/planner/execution_engine.py
new file mode 100644
index 0000000..30016c9
--- /dev/null
+++ b/frontend/planner/execution_engine.py
@@ -0,0 +1,16 @@
+class ExecutionEngine(object):
+ """
+ Provides the Test Planner execution engine
+ """
+
+ def __init__(self, plan_id):
+ self.plan_id = plan_id
+
+
+ def start(self):
+ """
+ Starts the execution engine.
+
+ Thread remains in this method until the execution engine is complete.
+ """
+ pass
diff --git a/frontend/planner/execution_engine_control.srv b/frontend/planner/execution_engine_control.srv
new file mode 100644
index 0000000..d4bb615
--- /dev/null
+++ b/frontend/planner/execution_engine_control.srv
@@ -0,0 +1,61 @@
+import time
+from autotest_lib.frontend import setup_django_environment
+from autotest_lib.client.common_lib import utils
+from autotest_lib.frontend.afe import models as afe_models
+from autotest_lib.frontend.shared import rest_client
+from autotest_lib.frontend.planner import execution_engine
+from autotest_lib.server import frontend
+
+
+TICK_INTERVAL_SECS = 10
+
+keyvals = utils.read_keyval(job.resultdir)
+
+planner_rpc = frontend.Planner()
+afe_rest = rest_client.Resource.load(
+ 'http://%s/afe/server/resources' % keyvals['server'])
+
+
+def _launch_set_atomic_group_job(plan, name):
+ """Launch the job to set the hosts' atomic group, and initate the plan"""
+ hosts = planner_rpc.run('get_hosts', plan_id=keyvals['plan_id'])
+ control = planner_rpc.run('get_atomic_group_control_file')
+
+ info = afe_rest.execution_info.get().execution_info
+ info['control_file'] = control
+ info['cleanup_before_job'] = afe_models.RebootBefore.NEVER
+ info['cleanup_after_job'] = afe_models.RebootAfter.NEVER
+ info['run_verify'] = False
+ info['machines_per_execution'] = len(hosts)
+
+ entries = afe_rest.queue_entries_request.get(hosts=hosts).queue_entries
+
+ job_req = {'name' : name,
+ 'execution_info' : info,
+ 'queue_entries' : entries,
+ 'keyvals' : keyvals}
+
+ afe_rest.jobs.post(job_req)
+
+
+# Check if the plan is already being initialized, and launch the initialization
+# job if not
+plan = planner_rpc.run('get_plan', id=keyvals['plan_id'])
+name = plan['name'] + '_set_atomic_group'
+if not afe_rest.jobs.get(name=name).total_results:
+ _launch_set_atomic_group_job(plan, name)
+
+
+# Wait for the plan to be initialized
+while True:
+ if planner_rpc.run('get_plan', id=keyvals['plan_id'])['initialized']:
+ break
+ time.sleep(TICK_INTERVAL_SECS)
+
+
+# Execution engine main loop
+execution_engine.ExecutionEngine(plan_id=keyvals['plan_id']).start()
+
+
+# Cleanup
+afe_rest.labels.get(name=keyvals['label_name']).members[0].delete()
diff --git a/frontend/planner/models.py b/frontend/planner/models.py
index 7f62471..2236b11 100644
--- a/frontend/planner/models.py
+++ b/frontend/planner/models.py
@@ -6,7 +6,7 @@
from autotest_lib.client.common_lib import enum, utils
-class Plan(dbmodels.Model):
+class Plan(dbmodels.Model, model_logic.ModelExtensions):
"""A test plan
Required:
@@ -30,6 +30,10 @@
owners = dbmodels.ManyToManyField(afe_models.User,
db_table='planner_plan_owners')
hosts = dbmodels.ManyToManyField(afe_models.Host, through='Host')
+ host_labels = dbmodels.ManyToManyField(afe_models.Label,
+ db_table='planner_plan_host_labels')
+
+ name_field = 'name'
class Meta:
db_table = 'planner_plans'
@@ -64,7 +68,7 @@
'Subclasses must override _get_details_unicode()')
-class Host(ModelWithPlan):
+class Host(ModelWithPlan, model_logic.ModelExtensions):
"""A plan host
Required:
@@ -76,14 +80,29 @@
complete = dbmodels.BooleanField(default=False)
blocked = dbmodels.BooleanField(default=False)
+ Status = enum.Enum('Finished', 'Running', 'Blocked', string_values=True)
+
class Meta:
db_table = 'planner_hosts'
+ def status(self):
+ if self.complete:
+ return Host.Status.FINISHED
+ if self.blocked:
+ return Host.Status.BLOCKED
+ return Host.Status.RUNNING
+
+
def _get_details_unicode(self):
return 'Host: %s' % host.hostname
+ @classmethod
+ def smart_get(cls, id):
+ raise NotImplementedError('Planner hosts do not support smart_get()')
+
+
class ControlFile(model_logic.ModelWithHash):
"""A control file. Immutable once added to the table
@@ -113,16 +132,23 @@
"""A planned test
Required:
+ alias: The name to give this test within the plan. Unique with plan id
test_control_file: The control file to run
execution_order: An integer describing when this test should be run in
the test plan
+ estimated_runtime: Time in hours that the test is expected to run. Will
+ be automatically generated (on the frontend) for
+ tests in Autotest.
"""
+ alias = dbmodels.CharField(max_length=255)
control_file = dbmodels.ForeignKey(ControlFile)
execution_order = dbmodels.IntegerField(blank=True)
+ estimated_runtime = dbmodels.IntegerField()
class Meta:
db_table = 'planner_tests'
ordering = ('execution_order',)
+ unique_together = (('plan', 'alias'),)
def _get_details_unicode(self):
@@ -187,6 +213,7 @@
test_job = dbmodels.ForeignKey(Job)
tko_test = dbmodels.ForeignKey(tko_models.Test)
+ host = dbmodels.ForeignKey(Host)
status = dbmodels.CharField(max_length=16, choices=Status.choices())
finalized = dbmodels.BooleanField(default=False)
seen = dbmodels.BooleanField(default=False)
diff --git a/frontend/planner/planner_test_utils.py b/frontend/planner/planner_test_utils.py
new file mode 100644
index 0000000..6aa2d3b
--- /dev/null
+++ b/frontend/planner/planner_test_utils.py
@@ -0,0 +1,26 @@
+import common
+from autotest_lib.frontend.afe import frontend_test_utils
+from autotest_lib.frontend.afe import models as afe_models
+from autotest_lib.frontend.planner import models
+from autotest_lib.client.common_lib import utils
+
+class PlannerTestMixin(frontend_test_utils.FrontendTestMixin):
+ _PLAN_NAME = 'plan'
+
+ def _planner_common_setup(self):
+ self._frontend_common_setup()
+
+ plan = models.Plan.objects.create(name=self._PLAN_NAME)
+ models.Host.objects.create(
+ plan=plan, host=afe_models.Host.objects.get(hostname='host1'))
+ models.Host.objects.create(
+ plan=plan, host=afe_models.Host.objects.get(hostname='host2'))
+ plan.host_labels.add(afe_models.Label.objects.get(name='label1'))
+ plan.save()
+
+ self._plan = plan
+
+
+ def _planner_common_teardown(self):
+ self._plan.delete()
+ self._frontend_common_teardown()
diff --git a/frontend/planner/rpc_interface.py b/frontend/planner/rpc_interface.py
index b8ea992..eaf4de9 100644
--- a/frontend/planner/rpc_interface.py
+++ b/frontend/planner/rpc_interface.py
@@ -5,4 +5,106 @@
__author__ = 'jamesren@google.com (James Ren)'
-# Nothing yet
+import os
+import common
+from django.db import models as django_models
+from autotest_lib.frontend import thread_local
+from autotest_lib.frontend.afe import model_logic, models as afe_models
+from autotest_lib.frontend.afe import rpc_utils as afe_rpc_utils
+from autotest_lib.frontend.planner import models, rpc_utils
+from autotest_lib.client.common_lib import utils
+
+# basic getter/setter calls
+# TODO: deprecate the basic calls and reimplement them in the REST framework
+
+def get_plan(id):
+ return afe_rpc_utils.prepare_for_serialization(
+ models.Plan.smart_get(id).get_object_dict())
+
+
+def modify_plan(id, **data):
+ models.Plan.smart_get(id).update_object(data)
+
+
+# more advanced calls
+
+def submit_plan(name, hosts, host_labels, tests,
+ support=None, label_override=None):
+ """
+ Submits a plan to the Test Planner
+
+ @param name: the name of the plan
+ @param hosts: a list of hostnames
+ @param host_labels: a list of host labels. The hosts under test will update
+ to reflect changes in the label
+ @param tests: a list of test control files to run
+ @param support: the global support object
+ @param label_override: label to prepend to all AFE jobs for this test plan.
+ Defaults to the plan name.
+ """
+ host_objects = []
+ label_objects = []
+
+ for host in hosts or []:
+ try:
+ host_objects.append(
+ afe_models.Host.valid_objects.get(hostname=host))
+ except afe_models.Host.DoesNotExist:
+ raise model_logic.ValidationError(
+ {'hosts': 'host %s does not exist' % host})
+
+ for label in host_labels or []:
+ try:
+ label_objects.append(afe_models.Label.valid_objects.get(name=label))
+ except afe_models.Label.DoesNotExist:
+ raise model_logic.ValidationError(
+ {'host_labels': 'host label %s does not exist' % label})
+
+ plan, created = models.Plan.objects.get_or_create(name=name)
+ if not created:
+ raise model_logic.ValidationError(
+ {'name': 'Plan name %s already exists' % name})
+
+ try:
+ label = rpc_utils.create_plan_label(plan)
+ except:
+ plan.delete()
+ raise
+
+ plan.label_override = label_override
+ plan.support = support or ''
+ plan.save()
+
+ plan.owners.add(afe_models.User.current_user())
+
+ for host in host_objects:
+ planner_host = models.Host.objects.create(plan=plan, host=host)
+
+ plan.host_labels.add(*label_objects)
+
+ rpc_utils.start_plan(plan, label)
+
+ return plan.id
+
+
+def get_hosts(plan_id):
+ """
+ Gets the hostnames of all the hosts in this test plan.
+
+ Resolves host labels in the plan.
+ """
+ plan = models.Plan.smart_get(plan_id)
+
+ hosts = set(plan.hosts.all().values_list('hostname', flat=True))
+ for label in plan.host_labels.all():
+ hosts.update(label.host_set.all().values_list('hostname', flat=True))
+
+ return afe_rpc_utils.prepare_for_serialization(hosts)
+
+
+def get_atomic_group_control_file():
+ """
+ Gets the control file to apply the atomic group for a set of machines
+ """
+ return rpc_utils.lazy_load(os.path.join(os.path.dirname(__file__),
+ 'set_atomic_group_control.srv'))
diff --git a/frontend/planner/rpc_interface_unittest.py b/frontend/planner/rpc_interface_unittest.py
new file mode 100644
index 0000000..0f1c71b
--- /dev/null
+++ b/frontend/planner/rpc_interface_unittest.py
@@ -0,0 +1,76 @@
+#!/usr/bin/python
+
+import unittest
+import common
+from autotest_lib.frontend import setup_django_environment
+from autotest_lib.frontend.planner import planner_test_utils
+from autotest_lib.frontend.afe import model_logic
+from autotest_lib.frontend.afe import models as afe_models
+from autotest_lib.frontend.planner import rpc_interface, models, rpc_utils
+
+
+class RpcInterfaceTest(unittest.TestCase,
+ planner_test_utils.PlannerTestMixin):
+ def setUp(self):
+ self._planner_common_setup()
+ self.god.stub_function(rpc_utils, 'start_plan')
+
+
+ def tearDown(self):
+ self._planner_common_teardown()
+
+
+ def test_submit_plan_success(self):
+ hosts = ('host1', 'host2')
+ plan_name = self._PLAN_NAME + '2'
+
+ rpc_utils.start_plan.expect_any_call()
+ rpc_interface.submit_plan(plan_name, hosts, ('label1',), ())
+
+ plan = models.Plan.objects.get(name=plan_name)
+ self.assertEqual(
+ set(afe_models.Host.objects.filter(hostname__in=hosts)),
+ set(plan.hosts.all()))
+
+ self.assertEqual(1, plan.host_labels.all().count())
+ self.assertEqual(afe_models.Label.objects.get(name='label1'),
+ plan.host_labels.all()[0])
+ self.god.check_playback()
+
+
+ def test_submit_plan_duplicate(self):
+ self.assertRaises(
+ model_logic.ValidationError, rpc_interface.submit_plan,
+ self._PLAN_NAME, (), (), ())
+
+
+ def test_submit_plan_bad_host(self):
+ self.assertRaises(
+ model_logic.ValidationError, rpc_interface.submit_plan,
+ self._PLAN_NAME + '2', ('fakehost'), (), ())
+
+
+ def test_submit_plan_bad_label(self):
+ self.assertRaises(
+ model_logic.ValidationError, rpc_interface.submit_plan,
+ self._PLAN_NAME + '2', (), ('fakelabel'), ())
+
+
+ def test_get_hosts(self):
+ hosts = rpc_interface.get_hosts(self._PLAN_NAME)
+ self.assertEqual(set(('host1', 'host2')), set(hosts))
+
+ afe_models.Host.objects.get(hostname='host3').labels.add(
+ afe_models.Label.objects.get(name='label1'))
+
+ hosts = rpc_interface.get_hosts(self._PLAN_NAME)
+ self.assertEqual(set(('host1', 'host2', 'host3')), set(hosts))
+
+ afe_models.Host.objects.get(hostname='host3').labels.clear()
+
+ hosts = rpc_interface.get_hosts(self._PLAN_NAME)
+ self.assertEqual(set(('host1', 'host2')), set(hosts))
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/frontend/planner/rpc_utils.py b/frontend/planner/rpc_utils.py
new file mode 100644
index 0000000..8d359bd
--- /dev/null
+++ b/frontend/planner/rpc_utils.py
@@ -0,0 +1,73 @@
+import common
+import os
+from autotest_lib.frontend.afe import models as afe_models, model_logic
+from autotest_lib.frontend.planner import models
+from autotest_lib.frontend.shared import rest_client
+from autotest_lib.client.common_lib import global_config, utils
+
+
+PLANNER_LABEL_PREFIX = 'planner_'
+PLANNER_ATOMIC_GROUP_NAME = 'planner_global_atomic_group'
+SERVER = global_config.global_config.get_config_value('SERVER', 'hostname')
+LAZY_LOADED_FILES = {}
+
+
+def create_plan_label(plan):
+ """
+ Creates the host label to apply on the plan hosts
+ """
+ group, _ = afe_models.AtomicGroup.objects.get_or_create(
+ name=PLANNER_ATOMIC_GROUP_NAME)
+ if group.invalid:
+ group.invalid = False
+ group.save()
+
+ name = PLANNER_LABEL_PREFIX + plan.name
+ if bool(afe_models.Label.valid_objects.filter(name=name)):
+ raise model_logic.ValidationError('Label %s already exists, '
+ 'cannot start plan' % name)
+ label = afe_models.Label(name=name, atomic_group=group)
+ label.save()
+
+ return label
+
+
+def start_plan(plan, label):
+ """
+ Takes the necessary steps to start a test plan in Autotest
+ """
+ afe_rest = rest_client.Resource.load(
+ 'http://%s/afe/server/resources' % SERVER)
+
+ keyvals = {'server': SERVER,
+ 'plan_id': plan.id,
+ 'label_name': label.name}
+
+ info = afe_rest.execution_info.get().execution_info
+ info['control_file'] = _get_execution_engine_control()
+ info['machines_per_execution'] = None
+
+ job_req = {'name': plan.name + '_execution_engine',
+ 'execution_info': info,
+ 'queue_entries': (),
+ 'keyvals': keyvals}
+
+ afe_rest.jobs.post(job_req)
+
+
+def _get_execution_engine_control():
+ """
+ Gets the control file to run the execution engine
+ """
+ return lazy_load(os.path.join(os.path.dirname(__file__),
+ 'execution_engine_control.srv'))
+
+
+def lazy_load(path):
+ """
+ Lazily loads the file indicated by the path given, and caches the result
+ """
+ if path not in LAZY_LOADED_FILES:
+ LAZY_LOADED_FILES[path] = utils.read_file(path)
+
+ return LAZY_LOADED_FILES[path]
diff --git a/frontend/planner/rpc_utils_unittest.py b/frontend/planner/rpc_utils_unittest.py
new file mode 100644
index 0000000..d168c05
--- /dev/null
+++ b/frontend/planner/rpc_utils_unittest.py
@@ -0,0 +1,69 @@
+#!/usr/bin/python
+
+import unittest
+import common
+from autotest_lib.frontend import setup_django_environment
+from autotest_lib.frontend.planner import planner_test_utils
+from autotest_lib.frontend.afe import model_logic, models as afe_models
+from autotest_lib.frontend.afe import rpc_interface as afe_rpc_interface
+from autotest_lib.frontend.planner import models, rpc_utils
+from autotest_lib.client.common_lib import utils
+
+
+class RpcUtilsTest(unittest.TestCase,
+ planner_test_utils.PlannerTestMixin):
+ def setUp(self):
+ self._planner_common_setup()
+
+
+ def tearDown(self):
+ self._planner_common_teardown()
+
+
+ def test_create_plan_label(self):
+ label, group = self._create_label_helper()
+
+ label.delete()
+ group.invalid = True
+ group.save()
+
+ label, group = self._create_label_helper()
+
+ self.assertRaises(model_logic.ValidationError,
+ rpc_utils.create_plan_label, self._plan)
+
+
+ def _create_label_helper(self):
+ label = rpc_utils.create_plan_label(self._plan)
+ group = afe_models.AtomicGroup.objects.get(
+ name=rpc_utils.PLANNER_ATOMIC_GROUP_NAME)
+ self.assertFalse(group.invalid)
+ self.assertEqual(label.atomic_group, group)
+
+ return (label, group)
+
+
+ def test_lazy_load(self):
+ self.god.stub_function(utils, 'read_file')
+
+ DUMMY_PATH_1 = object()
+ DUMMY_PATH_2 = object()
+ DUMMY_FILE_1 = object()
+ DUMMY_FILE_2 = object()
+
+ utils.read_file.expect_call(DUMMY_PATH_1).and_return(DUMMY_FILE_1)
+ self.assertEqual(DUMMY_FILE_1, rpc_utils.lazy_load(DUMMY_PATH_1))
+ self.god.check_playback()
+
+ # read_file should not be called again for this path
+ self.assertEqual(DUMMY_FILE_1, rpc_utils.lazy_load(DUMMY_PATH_1))
+ self.god.check_playback()
+
+ # new file; read_file must be called again
+ utils.read_file.expect_call(DUMMY_PATH_2).and_return(DUMMY_FILE_2)
+ self.assertEqual(DUMMY_FILE_2, rpc_utils.lazy_load(DUMMY_PATH_2))
+ self.god.check_playback()
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/frontend/planner/set_atomic_group_control.srv b/frontend/planner/set_atomic_group_control.srv
new file mode 100644
index 0000000..9d177c8
--- /dev/null
+++ b/frontend/planner/set_atomic_group_control.srv
@@ -0,0 +1,19 @@
+from autotest_lib.client.common_lib import utils
+from autotest_lib.frontend.shared import rest_client
+from autotest_lib.server import frontend
+
+keyvals = utils.read_keyval(job.resultdir)
+
+planner_rpc = frontend.Planner()
+afe_rest = rest_client.Resource.load(
+ 'http://%s/afe/server/resources' % keyvals['server'])
+
+
+label = afe_rest.labels.get(name=keyvals['label_name']).members[0].get()
+
+for machine in machines:
+ hostname = hosts.create_host(machine).hostname
+ host = afe_rest.hosts.get(hostname=hostname).members[0]
+ label.hosts.post({'host': host})
+
+planner_rpc.run('modify_plan', id=keyvals['plan_id'], initialized=True)