Implementation of Test Planner execution engine. Is currently able to
schedule single-host tests and place the results in the proper planner
tables for analysis.
TODO: global support object, execution_engine.py unit tests
Signed-off-by: James Ren <jamesren@google.com>
git-svn-id: http://test.kernel.org/svn/autotest/trunk@4301 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/frontend/afe/model_logic.py b/frontend/afe/model_logic.py
index 59e5ffb..2f03fef 100644
--- a/frontend/afe/model_logic.py
+++ b/frontend/afe/model_logic.py
@@ -947,7 +947,7 @@
if isinstance(id_or_name, (int, long)):
return manager.get(pk=id_or_name)
- if isinstance(id_or_name, basestring):
+ if isinstance(id_or_name, basestring) and hasattr(cls, 'name_field'):
return manager.get(**{cls.name_field : id_or_name})
raise ValueError(
'Invalid positional argument: %s (%s)' % (id_or_name,
diff --git a/frontend/afe/resources.py b/frontend/afe/resources.py
index b31426a..9527acb 100644
--- a/frontend/afe/resources.py
+++ b/frontend/afe/resources.py
@@ -554,6 +554,8 @@
for label_name in meta_hosts:
entry = Label.from_uri_args(self._request, label_name)
entries.append({'meta_host': entry.link()})
+ if atomic_group_class:
+ entries.append({'atomic_group_class': atomic_group_class})
result = self.link()
result['queue_entries'] = entries
@@ -675,9 +677,9 @@
label_entry = containing_collection.resolve_link(
queue_entry['meta_host'])
metahost_label_objects.append(label_entry.instance)
- if 'atomic_group' in queue_entry:
+ if 'atomic_group_class' in queue_entry:
atomic_group_entry = containing_collection.resolve_link(
- queue_entry['atomic_group'])
+ queue_entry['atomic_group_class'])
if atomic_group:
assert atomic_group_entry.instance.id == atomic_group.id
else:
diff --git a/frontend/planner/execution_engine.py b/frontend/planner/execution_engine.py
index 30016c9..c0d1643 100644
--- a/frontend/planner/execution_engine.py
+++ b/frontend/planner/execution_engine.py
@@ -1,10 +1,26 @@
+import time, logging
+from autotest_lib.frontend.afe import model_attributes as afe_model_attributes
+from autotest_lib.frontend.shared import rest_client
+from autotest_lib.frontend.planner import model_attributes
+from autotest_lib.server import frontend
+
+
+TICK_INTERVAL_SECS = 10
+
class ExecutionEngine(object):
"""
Provides the Test Planner execution engine
"""
- def __init__(self, plan_id):
- self.plan_id = plan_id
+ _planner_rpc = frontend.Planner()
+ _tko_rpc = frontend.TKO()
+
+ def __init__(self, plan_id, server, label_name):
+ self._plan_id = plan_id
+ self._server = server
+ self._afe_rest = rest_client.Resource.load(
+ 'http://%s/afe/server/resources' % server)
+ self._label_name = label_name
def start(self):
@@ -13,4 +29,173 @@
Thread remains in this method until the execution engine is complete.
"""
- pass
+ self._initialize_plan()
+
+ while True:
+ if self._tick():
+ break
+ time.sleep(TICK_INTERVAL_SECS)
+
+ self._cleanup()
+
+
+ def _initialize_plan(self):
+ """
+ Performs actions necessary to start a test plan.
+
+ Adds the hosts into the proper atomic group, and waits for the plan to
+ be ready to start before returning
+ """
+ plan = self._planner_rpc.run('get_plan', id=self._plan_id)
+ name = plan['name'] + '_set_atomic_group'
+ if not self._afe_rest.jobs.get(name=name).total_results:
+ self._launch_set_atomic_group_job(name)
+
+ self._wait_for_initialization()
+
+
+ def _launch_set_atomic_group_job(self, name):
+ """
+ Launch the job to set the hosts' atomic group, and initate the plan
+
+ If the hosts are already part of an atomic group, wait for a tick and
+ try again. Return when successful
+ """
+ while True:
+ hosts = self._planner_rpc.run('get_hosts', plan_id=self._plan_id)
+ control = self._planner_rpc.run('get_atomic_group_control_file')
+
+ info = self._afe_rest.execution_info.get().execution_info
+ info['control_file'] = control
+ info['cleanup_before_job'] = afe_model_attributes.RebootBefore.NEVER
+ info['cleanup_after_job'] = afe_model_attributes.RebootAfter.NEVER
+ info['run_verify'] = False
+ info['machines_per_execution'] = len(hosts)
+
+ entries = self._afe_rest.queue_entries_request.get(
+ hosts=hosts).queue_entries
+
+ keyvals = {'server': self._server,
+ 'label_name': self._label_name,
+ 'plan_id': self._plan_id}
+
+ job_req = {'name' : name,
+ 'execution_info' : info,
+ 'queue_entries' : entries,
+ 'keyvals' : keyvals}
+
+ try:
+ self._afe_rest.jobs.post(job_req)
+ logging.info('created job to set atomic group')
+ break
+ except rest_client.ClientError, e:
+ logging.info('hosts already in atomic group')
+ logging.info('(error was %s)' % e.message)
+ logging.info('waiting...')
+ time.sleep(TICK_INTERVAL_SECS)
+
+
+ def _wait_for_initialization(self):
+ while True:
+ plan = self._planner_rpc.run('get_plan', id=self._plan_id)
+ if plan['initialized']:
+ break
+ logging.info('waiting for initialization...')
+ time.sleep(TICK_INTERVAL_SECS)
+
+
+ def _cleanup(self):
+ self._afe_rest.labels.get(name=self._label_name).members[0].delete()
+
+
+ def _tick(self):
+ """
+ Processes one tick of the execution engine.
+
+ Returns True if the engine has completed the plan.
+ """
+ logging.info('tick')
+ self._process_finished_runs()
+ self._check_tko_jobs()
+ return self._schedule_new_runs()
+
+
+ def _process_finished_runs(self):
+ """
+ Finalize the test runs that have finished.
+
+ Look for runs that are in PASSED or FAILED, perform any additional
+ processing required, and set the entry to 'finalized'.
+ """
+ Status = model_attributes.TestRunStatus
+ runs = self._planner_rpc.run('get_test_runs', plan__id=self._plan_id,
+ status__in=(Status.PASSED, Status.FAILED),
+ finalized=False)
+ for run in runs:
+ logging.info('finalizing test run %s', run)
+ if run['status'] == Status.FAILED:
+ self._planner_rpc.run('modify_host', id=run['host'],
+ blocked=True)
+ self._planner_rpc.run('modify_test_run', id=run['id'],
+ finalized=True)
+
+
+ def _check_tko_jobs(self):
+ """
+ Instructs the server to update the Planner test runs table
+
+ Sends an RPC to have the server pull the proper TKO tests and add them
+ to the Planner tables. Logs information about what was added.
+ """
+ test_runs_updated = self._planner_rpc.run('update_test_runs',
+ plan_id=self._plan_id)
+ for update in test_runs_updated:
+ logging.info('added %s test run for tko test id %s (%s)',
+ update['status'], update['tko_test_idx'],
+ update['hostname'])
+
+
+ def _schedule_new_runs(self):
+ next_configs = self._planner_rpc.run('get_next_test_configs',
+ plan_id=self._plan_id)
+ if next_configs['complete']:
+ return True
+
+ for config in next_configs['next_configs']:
+ self._run_job(hostname=config['host'],
+ test_config_id=config['next_test_config_id'])
+
+ return False
+
+
+ def _run_job(self, hostname, test_config_id):
+ test_config = self._planner_rpc.run('get_test_config',
+ id=test_config_id)
+
+ info = self._afe_rest.execution_info.get().execution_info
+ info['control_file'] = test_config['control_file']['contents']
+ info['is_server'] = test_config['is_server']
+
+ atomic_group_class = self._afe_rest.labels.get(
+ name=self._label_name).members[0].get().atomic_group_class.href
+
+ request = self._afe_rest.queue_entries_request.get(
+ hosts=(hostname,), atomic_group_class=atomic_group_class)
+ entries = request.queue_entries
+
+ plan = self._planner_rpc.run('get_plan', id=self._plan_id)
+ prefix = plan['label_override']
+ if prefix is None:
+ prefix = plan['name']
+ job_req = {'name' : '%s_%s_%s' % (prefix, test_config['alias'],
+ hostname),
+ 'execution_info' : info,
+ 'queue_entries' : entries}
+
+ logging.info('starting test alias %s for host %s',
+ test_config['alias'], hostname)
+ job = self._afe_rest.jobs.post(job_req)
+ self._planner_rpc.run('add_job',
+ plan_id=self._plan_id,
+ test_config_id=test_config_id,
+ afe_job_id=job.get().id)
diff --git a/frontend/planner/execution_engine_control.srv b/frontend/planner/execution_engine_control.srv
index 96a4250..915196b 100644
--- a/frontend/planner/execution_engine_control.srv
+++ b/frontend/planner/execution_engine_control.srv
@@ -1,60 +1,9 @@
-import time
from autotest_lib.client.common_lib import utils
-from autotest_lib.frontend.afe import model_attributes
-from autotest_lib.frontend.shared import rest_client
from autotest_lib.frontend.planner import execution_engine
-from autotest_lib.server import frontend
-
-
-TICK_INTERVAL_SECS = 10
keyvals = utils.read_keyval(job.resultdir)
-planner_rpc = frontend.Planner()
-afe_rest = rest_client.Resource.load(
- 'http://%s/afe/server/resources' % keyvals['server'])
-
-
-def _launch_set_atomic_group_job(plan, name):
- """Launch the job to set the hosts' atomic group, and initate the plan"""
- hosts = planner_rpc.run('get_hosts', plan_id=keyvals['plan_id'])
- control = planner_rpc.run('get_atomic_group_control_file')
-
- info = afe_rest.execution_info.get().execution_info
- info['control_file'] = control
- info['cleanup_before_job'] = model_attributes.RebootBefore.NEVER
- info['cleanup_after_job'] = model_attributes.RebootAfter.NEVER
- info['run_verify'] = False
- info['machines_per_execution'] = len(hosts)
-
- entries = afe_rest.queue_entries_request.get(hosts=hosts).queue_entries
-
- job_req = {'name' : name,
- 'execution_info' : info,
- 'queue_entries' : entries,
- 'keyvals' : keyvals}
-
- afe_rest.jobs.post(job_req)
-
-
-# Check if the plan is already being initialized, and launch the initialization
-# job if not
-plan = planner_rpc.run('get_plan', id=keyvals['plan_id'])
-name = plan['name'] + '_set_atomic_group'
-if not afe_rest.jobs.get(name=name).total_results:
- _launch_set_atomic_group_job(plan, name)
-
-
-# Wait for the plan to be initialized
-while True:
- if planner_rpc.run('get_plan', id=keyvals['plan_id'])['initialized']:
- break
- time.sleep(TICK_INTERVAL_SECS)
-
-
-# Execution engine main loop
-execution_engine.ExecutionEngine(plan_id=keyvals['plan_id']).start()
-
-
-# Cleanup
-afe_rest.labels.get(name=keyvals['label_name']).members[0].delete()
+engine = execution_engine.ExecutionEngine(plan_id=keyvals['plan_id'],
+ server=keyvals['server'],
+ label_name=keyvals['label_name'])
+engine.start()
diff --git a/frontend/planner/models.py b/frontend/planner/models.py
index 52ff1bc..f69aa28 100644
--- a/frontend/planner/models.py
+++ b/frontend/planner/models.py
@@ -3,7 +3,8 @@
from autotest_lib.frontend.afe import models as afe_models
from autotest_lib.frontend.afe import model_logic, rpc_utils
from autotest_lib.frontend.tko import models as tko_models
-from autotest_lib.client.common_lib import enum, utils
+from autotest_lib.frontend.planner import model_attributes
+from autotest_lib.client.common_lib import utils
class Plan(dbmodels.Model, model_logic.ModelExtensions):
@@ -75,12 +76,13 @@
host: The AFE host
complete: True if and only if this host is finished in the test plan
blocked: True if and only if the host is blocked (not executing tests)
+ added_by_label: True if and only if the host was added because of a host
+ label (as opposed to being explicitly added)
"""
host = dbmodels.ForeignKey(afe_models.Host)
complete = dbmodels.BooleanField(default=False)
blocked = dbmodels.BooleanField(default=False)
-
- Status = enum.Enum('Finished', 'Running', 'Blocked', string_values=True)
+ added_by_label = dbmodels.BooleanField(default=False)
class Meta:
db_table = 'planner_hosts'
@@ -88,30 +90,26 @@
def status(self):
if self.complete:
- return Host.Status.FINISHED
+ return model_attributes.HostStatus.FINISHED
if self.blocked:
- return Host.Status.BLOCKED
- return Host.Status.RUNNING
+ return model_attributes.HostStatus.BLOCKED
+ return model_attributes.HostStatus.RUNNING
def _get_details_unicode(self):
return 'Host: %s' % self.host.hostname
- @classmethod
- def smart_get(cls, id):
- raise NotImplementedError('Planner hosts do not support smart_get()')
-
-
-class ControlFile(model_logic.ModelWithHash):
+class ControlFile(model_logic.ModelWithHash,
+ model_logic.ModelExtensions):
"""A control file. Immutable once added to the table
Required:
contents: The text of the control file
Others:
- control_hash: The SHA1 hash of the control file, for duplicate detection
- and fast search
+ the_hash: The SHA1 hash of the control file, for duplicate detection
+ and fast search
"""
contents = dbmodels.TextField()
@@ -128,12 +126,13 @@
return u'Control file id %s (SHA1: %s)' % (self.id, self.control_hash)
-class Test(ModelWithPlan):
+class TestConfig(ModelWithPlan, model_logic.ModelExtensions):
"""A planned test
Required:
alias: The name to give this test within the plan. Unique with plan id
test_control_file: The control file to run
+ is_server: True if this control file is a server-side test
execution_order: An integer describing when this test should be run in
the test plan
estimated_runtime: Time in hours that the test is expected to run. Will
@@ -142,27 +141,28 @@
"""
alias = dbmodels.CharField(max_length=255)
control_file = dbmodels.ForeignKey(ControlFile)
+ is_server = dbmodels.BooleanField(default=True)
execution_order = dbmodels.IntegerField(blank=True)
estimated_runtime = dbmodels.IntegerField()
class Meta:
- db_table = 'planner_tests'
+ db_table = 'planner_test_configs'
ordering = ('execution_order',)
unique_together = (('plan', 'alias'),)
def _get_details_unicode(self):
- return 'Planned test - Control file id %s' % self.control_file.id
+ return 'Planned test config - Control file id %s' % self.control_file.id
-class Job(ModelWithPlan):
+class Job(ModelWithPlan, model_logic.ModelExtensions):
"""Represents an Autotest job initiated for a test plan
Required:
- test: The Test associated with this Job
+ test: The TestConfig associated with this Job
afe_job: The Autotest job
"""
- test = dbmodels.ForeignKey(Test)
+ test_config = dbmodels.ForeignKey(TestConfig)
afe_job = dbmodels.ForeignKey(afe_models.Job)
class Meta:
@@ -189,7 +189,7 @@
return u'Bug external ID %s' % self.external_uid
-class TestRun(ModelWithPlan):
+class TestRun(ModelWithPlan, model_logic.ModelExtensions):
"""An individual test run from an Autotest job for the test plan.
Each Job object may have multiple TestRun objects associated with it.
@@ -209,12 +209,13 @@
Optional:
bugs: Bugs filed that a relevant to this run
"""
- Status = enum.Enum('Active', 'Passed', 'Failed', string_values=True)
-
test_job = dbmodels.ForeignKey(Job)
tko_test = dbmodels.ForeignKey(tko_models.Test)
host = dbmodels.ForeignKey(Host)
- status = dbmodels.CharField(max_length=16, choices=Status.choices())
+ status = dbmodels.CharField(
+ max_length=16,
+ choices=model_attributes.TestRunStatus.choices(),
+ default=model_attributes.TestRunStatus.ACTIVE)
finalized = dbmodels.BooleanField(default=False)
seen = dbmodels.BooleanField(default=False)
triaged = dbmodels.BooleanField(default=False)
@@ -224,6 +225,7 @@
class Meta:
db_table = 'planner_test_runs'
+ unique_together = (('plan', 'test_job', 'tko_test', 'host'),)
def _get_details_unicode(self):
@@ -294,12 +296,11 @@
name: The name given to the object
encoded_object: The actual object
"""
- Type = enum.Enum('support', 'triage', 'autoprocess', 'custom_query',
- string_values=True)
-
user = dbmodels.ForeignKey(afe_models.User)
- object_type = dbmodels.CharField(max_length=16,
- choices=Type.choices(), db_column='type')
+ object_type = dbmodels.CharField(
+ max_length=16,
+ choices=model_attributes.SavedObjectType.choices(),
+ db_column='type')
name = dbmodels.CharField(max_length=255)
encoded_object = dbmodels.TextField()
@@ -337,8 +338,8 @@
value: The value
Others:
- keyval_hash: The result of SHA1(SHA1(key) ++ value), for duplicate
- detection and fast search.
+ the_hash: The result of SHA1(SHA1(key) ++ value), for duplicate
+ detection and fast search.
"""
key = dbmodels.CharField(max_length=1024)
value = dbmodels.CharField(max_length=1024)
diff --git a/frontend/planner/rpc_interface.py b/frontend/planner/rpc_interface.py
index eaf4de9..219436f 100644
--- a/frontend/planner/rpc_interface.py
+++ b/frontend/planner/rpc_interface.py
@@ -11,6 +11,7 @@
from autotest_lib.frontend import thread_local
from autotest_lib.frontend.afe import model_logic, models as afe_models
from autotest_lib.frontend.afe import rpc_utils as afe_rpc_utils
+from autotest_lib.frontend.tko import models as tko_models
from autotest_lib.frontend.planner import models, rpc_utils
from autotest_lib.client.common_lib import utils
@@ -26,6 +27,32 @@
models.Plan.smart_get(id).update_object(data)
+def get_test_runs(**filter_data):
+ return afe_rpc_utils.prepare_for_serialization(
+ [test_run.get_object_dict() for test_run
+ in models.TestRun.objects.filter(**filter_data)])
+
+
+def modify_test_run(id, **data):
+ models.TestRun.objects.get(id=id).update_object(data)
+
+
+def modify_host(id, **data):
+ models.Host.objects.get(id=id).update_object(data)
+
+
+def get_test_config(id):
+ return afe_rpc_utils.prepare_rows_as_nested_dicts(
+ models.TestConfig.objects.filter(id=id), ('control_file',))[0]
+
+
+def add_job(plan_id, test_config_id, afe_job_id):
+ models.Job.objects.create(
+ plan=models.Plan.objects.get(id=plan_id),
+ test_config=models.TestConfig.objects.get(id=test_config_id),
+ afe_job=afe_models.Job.objects.get(id=afe_job_id))
+
+
# more advanced calls
def submit_plan(name, hosts, host_labels, tests,
@@ -37,7 +64,12 @@
@param hosts: a list of hostnames
@param host_labels: a list of host labels. The hosts under test will update
to reflect changes in the label
- @param tests: a list of test control files to run
+ @param tests: an ordered list of dictionaries:
+ alias: an alias for the test
+ control_file: the test control file
+ is_server: True if is a server-side control file
+ estimated_runtime: estimated number of hours this test
+ will run
@param support: the global support object
@param label_override: label to prepend to all AFE jobs for this test plan.
Defaults to the plan name.
@@ -60,6 +92,21 @@
raise model_logic.ValidationError(
{'host_labels': 'host label %s does not exist' % label})
+ aliases_seen = set()
+ test_required_fields = (
+ 'alias', 'control_file', 'is_server', 'estimated_runtime')
+ for test in tests:
+ for field in test_required_fields:
+ if field not in test:
+ raise model_logic.ValidationError(
+ {'tests': 'field %s is required' % field})
+
+ alias = test['alias']
+ if alias in aliases_seen:
+ raise model_logic.Validationerror(
+ {'tests': 'alias %s occurs more than once' % alias})
+ aliases_seen.add(alias)
+
plan, created = models.Plan.objects.get_or_create(name=name)
if not created:
raise model_logic.ValidationError(
@@ -67,25 +114,36 @@
try:
label = rpc_utils.create_plan_label(plan)
+ try:
+ for i, test in enumerate(tests):
+ control, _ = models.ControlFile.objects.get_or_create(
+ contents=test['control_file'])
+ models.TestConfig.objects.create(
+ plan=plan, alias=test['alias'], control_file=control,
+ is_server=test['is_server'], execution_order=i,
+ estimated_runtime=test['estimated_runtime'])
+
+ plan.label_override = label_override
+ plan.support = support or ''
+ plan.save()
+
+ plan.owners.add(afe_models.User.current_user())
+
+ for host in host_objects:
+ planner_host = models.Host.objects.create(plan=plan, host=host)
+
+ plan.host_labels.add(*label_objects)
+
+ rpc_utils.start_plan(plan, label)
+
+ return plan.id
+ except:
+ label.delete()
+ raise
except:
plan.delete()
raise
- plan.label_override = label_override
- plan.support = support or ''
- plan.save()
-
- plan.owners.add(afe_models.User.current_user())
-
- for host in host_objects:
- planner_host = models.Host.objects.create(plan=plan, host=host)
-
- plan.host_labels.add(*label_objects)
-
- rpc_utils.start_plan(plan, label)
-
- return plan.id
-
def get_hosts(plan_id):
"""
@@ -108,3 +166,72 @@
"""
return rpc_utils.lazy_load(os.path.join(os.path.dirname(__file__),
'set_atomic_group_control.srv'))
+
+
+def get_next_test_configs(plan_id):
+ """
+ Gets information about the next planner test configs that need to be run
+
+ @param plan_id: the ID or name of the test plan
+ @return a dictionary:
+ complete: True or False, shows test plan completion
+ next_configs: a list of dictionaries:
+ host: ID of the host
+ next_test_config_id: ID of the next Planner test to run
+ """
+ plan = models.Plan.smart_get(plan_id)
+
+ result = {'next_configs': []}
+
+ rpc_utils.update_hosts_table(plan)
+ for host in models.Host.objects.filter(plan=plan):
+ next_test_config_id = rpc_utils.compute_next_test_config(plan, host)
+ if next_test_config_id:
+ config = {'next_test_config_id': next_test_config_id,
+ 'host': host.host.hostname}
+ result['next_configs'].append(config)
+
+ rpc_utils.check_for_completion(plan)
+ result['complete'] = plan.complete
+
+ return result
+
+
+def update_test_runs(plan_id):
+ """
+ Add all applicable TKO jobs to the Planner DB tables
+
+ Looks for tests in the TKO tables that were started as a part of the test
+ plan, and add them to the Planner tables.
+
+ Also updates the status of the test run if the underlying TKO test move from
+ an active status to a completed status.
+
+ @return a list of dictionaries:
+ status: the status of the new (or updated) test run
+ tko_test_idx: the ID of the TKO test added
+ hostname: the host added
+ """
+ plan = models.Plan.objects.get(id=plan_id)
+ updated = []
+
+ for planner_job in plan.job_set.all():
+ known_statuses = dict((test_run.tko_test.test_idx, test_run.status)
+ for test_run in planner_job.testrun_set.all())
+ tko_tests_for_job = tko_models.Test.objects.filter(
+ job__afe_job_id=planner_job.afe_job.id)
+
+ for tko_test in tko_tests_for_job:
+ status = rpc_utils.compute_test_run_status(tko_test.status.word)
+ needs_update = (tko_test.test_idx not in known_statuses or
+ status != known_statuses[tko_test.test_idx])
+ if needs_update:
+ hostnames = tko_test.machine.hostname.split(',')
+ for hostname in hostnames:
+ rpc_utils.add_test_run(
+ plan, planner_job, tko_test, hostname, status)
+ updated.append({'status': status,
+ 'tko_test_idx': tko_test.test_idx,
+ 'hostname': hostname})
+
+ return updated
diff --git a/frontend/planner/rpc_interface_unittest.py b/frontend/planner/rpc_interface_unittest.py
index 0f1c71b..081e433 100644
--- a/frontend/planner/rpc_interface_unittest.py
+++ b/frontend/planner/rpc_interface_unittest.py
@@ -3,10 +3,11 @@
import unittest
import common
from autotest_lib.frontend import setup_django_environment
-from autotest_lib.frontend.planner import planner_test_utils
+from autotest_lib.frontend.planner import planner_test_utils, model_attributes
+from autotest_lib.frontend.planner import rpc_interface, models, rpc_utils
from autotest_lib.frontend.afe import model_logic
from autotest_lib.frontend.afe import models as afe_models
-from autotest_lib.frontend.planner import rpc_interface, models, rpc_utils
+from autotest_lib.frontend.tko import models as tko_models
class RpcInterfaceTest(unittest.TestCase,
@@ -72,5 +73,110 @@
self.assertEqual(set(('host1', 'host2')), set(hosts))
+ def test_get_next_test_configs(self):
+ DUMMY_CONFIGS = {'host1': object(),
+ 'host2': object()}
+ DUMMY_COMPLETE = object()
+ self.god.stub_function(rpc_utils, 'compute_next_test_config')
+
+ for host in models.Host.objects.filter(plan=self._plan):
+ rpc_utils.compute_next_test_config.expect_call(
+ self._plan, host).and_return(
+ DUMMY_CONFIGS[host.host.hostname])
+
+ def _dummy_check_for_completion(plan):
+ plan.complete = DUMMY_COMPLETE
+ rpc_utils.check_for_completion = _dummy_check_for_completion
+
+ result = rpc_interface.get_next_test_configs(self._plan.id)
+
+ self.god.check_playback()
+ self.assertEqual(result['complete'], DUMMY_COMPLETE)
+ for config in result['next_configs']:
+ self.assertTrue(config['host'] in DUMMY_CONFIGS)
+ self.assertEqual(config['next_test_config_id'],
+ DUMMY_CONFIGS[config['host']])
+
+
+ def test_update_test_runs(self):
+ GOOD_STATUS_WORD = 'GOOD'
+ RUNNING_STATUS_WORD = 'RUNNING'
+ hostname = self.hosts[0].hostname
+
+ self.god.stub_function(rpc_utils, 'compute_test_run_status')
+ self.god.stub_function(rpc_utils, 'add_test_run')
+
+ control, _ = models.ControlFile.objects.get_or_create(
+ contents='test_control')
+ test_config = models.TestConfig.objects.create(plan=self._plan,
+ alias='config',
+ control_file=control,
+ execution_order=1,
+ estimated_runtime=1)
+ afe_job = self._create_job(hosts=(1,))
+ planner_host = models.Host.objects.create(plan=self._plan,
+ host=self.hosts[0])
+ planner_job = models.Job.objects.create(plan=self._plan,
+ test_config=test_config,
+ afe_job=afe_job)
+ tko_machine = tko_models.Machine.objects.create(hostname=hostname)
+ tko_job = tko_models.Job.objects.create(tag='job',
+ machine=tko_machine,
+ afe_job_id=afe_job.id)
+ tko_kernel = tko_models.Kernel.objects.create()
+ running_status = tko_models.Status.objects.create(
+ word=RUNNING_STATUS_WORD)
+ good_status = tko_models.Status.objects.create(word=GOOD_STATUS_WORD)
+
+ # No TKO tests
+ self.assertEqual([], rpc_interface.update_test_runs(self._plan.id))
+ self.god.check_playback()
+
+ # active TKO test
+ tko_test = tko_models.Test.objects.create(job=tko_job,
+ machine=tko_machine,
+ kernel=tko_kernel,
+ status=running_status)
+
+ rpc_utils.compute_test_run_status.expect_call(
+ RUNNING_STATUS_WORD).and_return(
+ model_attributes.TestRunStatus.ACTIVE)
+ rpc_utils.add_test_run.expect_call(
+ self._plan, planner_job, tko_test, hostname,
+ model_attributes.TestRunStatus.ACTIVE)
+ self.assertEqual(rpc_interface.update_test_runs(self._plan.id),
+ [{'status': model_attributes.TestRunStatus.ACTIVE,
+ 'tko_test_idx': tko_test.test_idx,
+ 'hostname': hostname}])
+ self.god.check_playback()
+ test_run = models.TestRun.objects.create(
+ plan=self._plan, test_job=planner_job,
+ tko_test=tko_test, host=planner_host,
+ status=model_attributes.TestRunStatus.ACTIVE)
+
+ # no change to TKO test
+ rpc_utils.compute_test_run_status.expect_call(
+ RUNNING_STATUS_WORD).and_return(
+ model_attributes.TestRunStatus.ACTIVE)
+ self.assertEqual([], rpc_interface.update_test_runs(self._plan.id))
+ self.god.check_playback()
+
+ # TKO test is now complete, passed
+ tko_test.status = good_status
+ tko_test.save()
+
+ rpc_utils.compute_test_run_status.expect_call(
+ GOOD_STATUS_WORD).and_return(
+ model_attributes.TestRunStatus.PASSED)
+ rpc_utils.add_test_run.expect_call(
+ self._plan, planner_job, tko_test, hostname,
+ model_attributes.TestRunStatus.PASSED)
+ self.assertEqual(rpc_interface.update_test_runs(self._plan.id),
+ [{'status': model_attributes.TestRunStatus.PASSED,
+ 'tko_test_idx': tko_test.test_idx,
+ 'hostname': hostname}])
+ self.god.check_playback()
+
+
if __name__ == '__main__':
unittest.main()
diff --git a/frontend/planner/rpc_utils.py b/frontend/planner/rpc_utils.py
index 8d359bd..79a6dcc 100644
--- a/frontend/planner/rpc_utils.py
+++ b/frontend/planner/rpc_utils.py
@@ -1,8 +1,7 @@
import common
import os
from autotest_lib.frontend.afe import models as afe_models, model_logic
-from autotest_lib.frontend.planner import models
-from autotest_lib.frontend.shared import rest_client
+from autotest_lib.frontend.planner import models, model_attributes
from autotest_lib.client.common_lib import global_config, utils
@@ -36,23 +35,22 @@
"""
Takes the necessary steps to start a test plan in Autotest
"""
- afe_rest = rest_client.Resource.load(
- 'http://%s/afe/server/resources' % SERVER)
-
keyvals = {'server': SERVER,
'plan_id': plan.id,
'label_name': label.name}
-
- info = afe_rest.execution_info.get().execution_info
- info['control_file'] = _get_execution_engine_control()
- info['machines_per_execution'] = None
-
- job_req = {'name': plan.name + '_execution_engine',
- 'execution_info': info,
- 'queue_entries': (),
+ options = {'name': plan.name + '_execution_engine',
+ 'priority': afe_models.Job.Priority.MEDIUM,
+ 'control_file': _get_execution_engine_control(),
+ 'control_type': afe_models.Job.ControlType.SERVER,
+ 'synch_count': None,
+ 'run_verify': False,
+ 'reboot_before': False,
+ 'reboot_after': False,
+ 'dependencies': (),
'keyvals': keyvals}
-
- afe_rest.jobs.post(job_req)
+ job = afe_models.Job.create(owner=afe_models.User.current_user().login,
+ options=options, hosts=())
+ job.queue(hosts=())
def _get_execution_engine_control():
@@ -71,3 +69,93 @@
LAZY_LOADED_FILES[path] = utils.read_file(path)
return LAZY_LOADED_FILES[path]
+
+
+def update_hosts_table(plan):
+ """
+ Resolves the host labels into host objects
+
+ Adds or removes hosts from the planner Hosts model based on changes to the
+ host label
+ """
+ label_hosts = set()
+
+ for label in plan.host_labels.all():
+ for afe_host in label.host_set.all():
+ host, created = models.Host.objects.get_or_create(plan=plan,
+ host=afe_host)
+ if created:
+ host.added_by_label = True
+ host.save()
+
+ label_hosts.add(host.host.id)
+
+ deleted_hosts = models.Host.objects.filter(
+ plan=plan, added_by_label=True).exclude(host__id__in=label_hosts)
+ deleted_hosts.delete()
+
+
+def compute_next_test_config(plan, host):
+ """
+ Gets the next test config that should be run for this plan and host
+
+ Returns None if the host is already running a job. Also sets the host's
+ complete bit if the host is finished running tests.
+ """
+ if host.blocked:
+ return None
+
+ test_configs = plan.testconfig_set.order_by('execution_order')
+ for test_config in test_configs:
+ afe_jobs = plan.job_set.filter(test_config=test_config)
+ afe_job_ids = afe_jobs.values_list('afe_job', flat=True)
+ hqes = afe_models.HostQueueEntry.objects.filter(job__id__in=afe_job_ids,
+ host=host.host)
+ if not hqes:
+ return test_config.id
+ for hqe in hqes:
+ if not hqe.complete:
+ # HostQueueEntry still active for this host,
+ # should not run another test
+ return None
+
+ # All HQEs related to this host are complete
+ host.complete = True
+ host.save()
+ return None
+
+
+def check_for_completion(plan):
+ """
+ Checks if a plan is actually complete. Sets complete=True if so
+ """
+ if not models.Host.objects.filter(plan=plan, complete=False):
+ plan.complete = True
+ plan.save()
+
+
+def compute_test_run_status(status):
+ """
+ Converts a TKO test status to a Planner test run status
+ """
+ Status = model_attributes.TestRunStatus
+ if status == 'GOOD':
+ return Status.PASSED
+ if status == 'RUNNING':
+ return Status.ACTIVE
+ return Status.FAILED
+
+
+def add_test_run(plan, planner_job, tko_test, hostname, status):
+ """
+ Adds a TKO test to the Planner Test Run tables
+ """
+ host = afe_models.Host.objects.get(hostname=hostname)
+
+ planner_host = models.Host.objects.get(plan=plan, host=host)
+ test_run, _ = models.TestRun.objects.get_or_create(plan=plan,
+ test_job=planner_job,
+ tko_test=tko_test,
+ host=planner_host)
+ test_run.status = status
+ test_run.save()
diff --git a/frontend/planner/views.py b/frontend/planner/views.py
index 541e707..d2d4837 100644
--- a/frontend/planner/views.py
+++ b/frontend/planner/views.py
@@ -15,3 +15,10 @@
def rpc_documentation(request):
return rpc_handler_obj.get_rpc_documentation()
+
+
+def model_documentation(request):
+ model_names = ('Plan', 'Host', 'ControlFile', 'TestConfig', 'Job', 'Bug',
+ 'TestRun', 'DataType', 'History', 'SavedObject', 'KeyVal',
+ 'AutoProcess')
+ return views_common.model_documentation(models, model_names)