blob: 6d97dd197fde7947d88b552cfb744389d71f87c6 [file] [log] [blame]
#!/usr/bin/python
import unittest
import common
from autotest_lib.frontend import setup_django_environment
from autotest_lib.frontend import setup_test_environment
from autotest_lib.frontend.afe import model_logic, models as afe_models
from autotest_lib.frontend.planner import planner_test_utils, model_attributes
from autotest_lib.frontend.planner import models, rpc_utils, failure_actions
from autotest_lib.frontend.tko import models as tko_models
from autotest_lib.client.common_lib import utils, host_queue_entry_states
class RpcUtilsTest(unittest.TestCase,
planner_test_utils.PlannerTestMixin):
def setUp(self):
self._planner_common_setup()
def tearDown(self):
self._planner_common_teardown()
def test_create_plan_label(self):
label, group = self._create_label_helper()
label.delete()
group.invalid = True
group.save()
label, group = self._create_label_helper()
self.assertRaises(model_logic.ValidationError,
rpc_utils.create_plan_label, self._plan)
def _create_label_helper(self):
label = rpc_utils.create_plan_label(self._plan)
group = afe_models.AtomicGroup.objects.get(
name=rpc_utils.PLANNER_ATOMIC_GROUP_NAME)
self.assertFalse(group.invalid)
self.assertEqual(label.atomic_group, group)
return (label, group)
def test_lazy_load(self):
self.god.stub_function(utils, 'read_file')
DUMMY_PATH_1 = object()
DUMMY_PATH_2 = object()
DUMMY_FILE_1 = object()
DUMMY_FILE_2 = object()
utils.read_file.expect_call(DUMMY_PATH_1).and_return(DUMMY_FILE_1)
self.assertEqual(DUMMY_FILE_1, rpc_utils.lazy_load(DUMMY_PATH_1))
self.god.check_playback()
# read_file should not be called again for this path
self.assertEqual(DUMMY_FILE_1, rpc_utils.lazy_load(DUMMY_PATH_1))
self.god.check_playback()
# new file; read_file must be called again
utils.read_file.expect_call(DUMMY_PATH_2).and_return(DUMMY_FILE_2)
self.assertEqual(DUMMY_FILE_2, rpc_utils.lazy_load(DUMMY_PATH_2))
self.god.check_playback()
def test_update_hosts_table(self):
label = self.labels[3]
default_hosts = set(self._plan.hosts.all())
rpc_utils.update_hosts_table(self._plan)
self.assertEqual(default_hosts, set(self._plan.hosts.all()))
self.assertEqual(set(), self._get_added_by_label_hosts())
self._plan.host_labels.add(label)
rpc_utils.update_hosts_table(self._plan)
self.assertEqual(default_hosts.union(label.host_set.all()),
set(self._plan.hosts.all()))
self.assertEqual(set(label.host_set.all()),
self._get_added_by_label_hosts())
self._plan.host_labels.remove(label)
rpc_utils.update_hosts_table(self._plan)
self.assertEqual(default_hosts, set(self._plan.hosts.all()))
self.assertEqual(set(), self._get_added_by_label_hosts())
def _get_added_by_label_hosts(self):
return set(host.host for host in models.Host.objects.filter(
plan=self._plan, added_by_label=True))
def test_compute_next_test_config(self):
self._setup_active_plan()
test_config = models.TestConfig.objects.create(
plan=self._plan, alias='config2', control_file=self._control,
execution_order=2, estimated_runtime=1)
self.assertEqual(1, self._afe_job.hostqueueentry_set.count())
self.assertEqual(
None, rpc_utils.compute_next_test_config(self._plan,
self._planner_host))
self.assertFalse(self._planner_host.complete)
hqe = self._afe_job.hostqueueentry_set.all()[0]
hqe.status = host_queue_entry_states.Status.COMPLETED
hqe.save()
self.assertEqual(
test_config,
rpc_utils.compute_next_test_config(self._plan,
self._planner_host))
self.assertFalse(self._planner_host.complete)
afe_job = self._create_job(hosts=(1,))
planner_job = models.Job.objects.create(plan=self._plan,
test_config=test_config,
afe_job=afe_job)
self.assertEqual(
None, rpc_utils.compute_next_test_config(self._plan,
self._planner_host))
self.assertFalse(self._planner_host.complete)
hqe = afe_job.hostqueueentry_set.all()[0]
hqe.status = host_queue_entry_states.Status.COMPLETED
hqe.save()
self.assertEqual(
None, rpc_utils.compute_next_test_config(self._plan,
self._planner_host))
self.assertTrue(self._planner_host.complete)
def test_process_failure(self):
self._setup_active_plan()
tko_test = tko_models.Test.objects.create(job=self._tko_job,
machine=self._tko_machine,
kernel=self._tko_kernel,
status=self._running_status)
failure = models.TestRun.objects.create(
plan=self._plan,
test_job=self._planner_job,
tko_test=tko_test,
host=self._planner_host,
status=model_attributes.TestRunStatus.FAILED,
finalized=True, seen=False, triaged=False)
host_action = failure_actions.HostAction.UNBLOCK
test_action = failure_actions.TestAction.SKIP
labels = ['label1', 'label2']
keyvals = {'key1': 'value1',
'key2': 'value2'}
bugs = ['bug1', 'bug2']
reason = 'overriden reason'
invalidate = True
self.god.stub_function(rpc_utils, '_process_host_action')
self.god.stub_function(rpc_utils, '_process_test_action')
rpc_utils._process_host_action.expect_call(self._planner_host,
host_action)
rpc_utils._process_test_action.expect_call(self._planner_job,
test_action)
rpc_utils.process_failure(
failure_id=failure.id, host_action=host_action,
test_action=test_action, labels=labels, keyvals=keyvals,
bugs=bugs, reason=reason, invalidate=invalidate)
failure = models.TestRun.objects.get(id=failure.id)
self.assertEqual(
set(failure.tko_test.testlabel_set.all()),
set(tko_models.TestLabel.objects.filter(name__in=labels)))
self.assertEqual(
set(failure.tko_test.job.jobkeyval_set.all()),
set(tko_models.JobKeyval.objects.filter(
key__in=keyvals.iterkeys())))
self.assertEqual(set(failure.bugs.all()),
set(models.Bug.objects.filter(external_uid__in=bugs)))
self.assertEqual(failure.tko_test.reason, reason)
self.assertEqual(failure.invalidated, invalidate)
self.assertTrue(failure.seen)
self.assertTrue(failure.triaged)
self.god.check_playback()
def _replace_site_process_host_action(self, replacement):
self.god.stub_function(utils, 'import_site_function')
utils.import_site_function.expect_any_call().and_return(replacement)
def _remove_site_process_host_action(self):
def _site_process_host_action_dummy(host, action):
return False
self._replace_site_process_host_action(_site_process_host_action_dummy)
def test_process_host_action_block(self):
self._remove_site_process_host_action()
host = models.Host.objects.create(plan=self._plan, host=self.hosts[0],
blocked=False)
assert not host.blocked
rpc_utils._process_host_action(host, failure_actions.HostAction.BLOCK)
host = models.Host.objects.get(id=host.id)
self.assertTrue(host.blocked)
self.god.check_playback()
def test_process_host_action_unblock(self):
self._remove_site_process_host_action()
host = models.Host.objects.create(plan=self._plan, host=self.hosts[0],
blocked=True)
assert host.blocked
rpc_utils._process_host_action(host, failure_actions.HostAction.UNBLOCK)
host = models.Host.objects.get(id=host.id)
self.assertFalse(host.blocked)
self.god.check_playback()
def test_process_host_action_site(self):
self._remove_site_process_host_action
action = object()
failure_actions.HostAction.values.append(action)
host = models.Host.objects.create(plan=self._plan, host=self.hosts[0])
self.assertRaises(AssertionError, rpc_utils._process_host_action,
host, action)
self.god.check_playback()
self._called = False
def _site_process_host_action(host, action):
self._called = True
return True
self._replace_site_process_host_action(_site_process_host_action)
rpc_utils._process_host_action(host, action)
self.assertTrue(self._called)
self.god.check_playback()
def test_process_test_action_skip(self):
self._setup_active_plan()
planner_job = self._planner_job
assert not planner_job.requires_rerun
rpc_utils._process_test_action(planner_job,
failure_actions.TestAction.SKIP)
planner_job = models.Job.objects.get(id=planner_job.id)
self.assertFalse(planner_job.requires_rerun)
def test_process_test_action_rerun(self):
self._setup_active_plan()
planner_job = self._planner_job
assert not planner_job.requires_rerun
rpc_utils._process_test_action(planner_job,
failure_actions.TestAction.RERUN)
planner_job = models.Job.objects.get(id=planner_job.id)
self.assertTrue(planner_job.requires_rerun)
def test_set_additional_parameters(self):
hostname_regex = 'host[0-9]'
param_type = model_attributes.AdditionalParameterType.VERIFY
param_values = {'key1': 'value1',
'key2': []}
additional_parameters = {'hostname_regex': hostname_regex,
'param_type': param_type,
'param_values': param_values}
rpc_utils.set_additional_parameters(self._plan, [additional_parameters])
additional_parameters_query = (
models.AdditionalParameter.objects.filter(plan=self._plan))
self.assertEqual(additional_parameters_query.count(), 1)
additional_parameter = additional_parameters_query[0]
self.assertEqual(additional_parameter.hostname_regex, hostname_regex)
self.assertEqual(additional_parameter.param_type, param_type)
self.assertEqual(additional_parameter.application_order, 0)
values_query = additional_parameter.additionalparametervalue_set.all()
self.assertEqual(values_query.count(), 2)
value_query1 = values_query.filter(key='key1')
value_query2 = values_query.filter(key='key2')
self.assertEqual(value_query1.count(), 1)
self.assertEqual(value_query2.count(), 1)
self.assertEqual(value_query1[0].value, repr('value1'))
self.assertEqual(value_query2[0].value, repr([]))
def test_get_wrap_arguments(self):
hostname_regex = '.*'
param_type = model_attributes.AdditionalParameterType.VERIFY
additional_param = models.AdditionalParameter.objects.create(
plan=self._plan, hostname_regex=hostname_regex,
param_type=param_type, application_order=0)
models.AdditionalParameterValue.objects.create(
additional_parameter=additional_param,
key='key1', value=repr('value1'))
models.AdditionalParameterValue.objects.create(
additional_parameter=additional_param,
key='key2', value=repr([]))
actual = rpc_utils.get_wrap_arguments(self._plan, 'host', param_type)
expected = {'key1': repr('value1'),
'key2': repr([])}
self.assertEqual(actual, expected)
def test_compute_test_config_status_scheduled(self):
self._setup_active_plan()
self._planner_job.delete()
self.assertEqual(
rpc_utils.compute_test_config_status(self._planner_host),
rpc_utils.ComputeTestConfigStatusResult.SCHEDULED)
def test_compute_test_config_status_running(self):
self._setup_active_plan()
self.god.stub_function(models.Job, 'active')
models.Job.active.expect_call().and_return(True)
self.assertEqual(
rpc_utils.compute_test_config_status(self._planner_host),
rpc_utils.ComputeTestConfigStatusResult.RUNNING)
self.god.check_playback()
def test_compute_test_config_status_good(self):
self._setup_active_plan()
tko_test = self._tko_job.test_set.create(kernel=self._tko_kernel,
status=self._good_status,
machine=self._tko_machine)
self._plan.testrun_set.create(test_job=self._planner_job,
tko_test=tko_test,
host=self._planner_host)
self._planner_host.complete = True
self._planner_host.save()
self.god.stub_function(models.Job, 'active')
models.Job.active.expect_call().and_return(False)
self.assertEqual(
rpc_utils.compute_test_config_status(self._planner_host),
rpc_utils.ComputeTestConfigStatusResult.PASS)
self.god.check_playback()
def test_compute_test_config_status_bad(self):
self._setup_active_plan()
tko_test = self._tko_job.test_set.create(kernel=self._tko_kernel,
status=self._fail_status,
machine=self._tko_machine)
self._plan.testrun_set.create(test_job=self._planner_job,
tko_test=tko_test,
host=self._planner_host)
self._planner_host.complete = True
self._planner_host.save()
self.god.stub_function(models.Job, 'active')
models.Job.active.expect_call().and_return(False)
self.assertEqual(
rpc_utils.compute_test_config_status(self._planner_host),
rpc_utils.ComputeTestConfigStatusResult.FAIL)
self.god.check_playback()
if __name__ == '__main__':
unittest.main()