blob: b2980296b3263016327ff5e0d9d8dedde28b02f1 [file] [log] [blame]
#!/usr/bin/python
import datetime, unittest
import common
from autotest_lib.frontend import setup_django_environment
from autotest_lib.frontend.afe import frontend_test_utils
from django.db import connection
from autotest_lib.frontend.afe import models, rpc_interface, frontend_test_utils
from autotest_lib.frontend.afe import model_logic
_hqe_status = models.HostQueueEntry.Status
class RpcInterfaceTest(unittest.TestCase,
frontend_test_utils.FrontendTestMixin):
def setUp(self):
self._frontend_common_setup()
def tearDown(self):
self._frontend_common_teardown()
def test_validation(self):
# non-number for a numeric field
self.assertRaises(model_logic.ValidationError,
rpc_interface.add_atomic_group, name='foo',
max_number_of_machines='bar')
# omit a required field
self.assertRaises(model_logic.ValidationError, rpc_interface.add_label,
name=None)
# violate uniqueness constraint
self.assertRaises(model_logic.ValidationError, rpc_interface.add_host,
hostname='host1')
def test_multiple_platforms(self):
platform2 = models.Label.objects.create(name='platform2', platform=True)
self.assertRaises(model_logic.ValidationError,
rpc_interface. label_add_hosts, 'platform2',
['host1', 'host2'])
self.assertRaises(model_logic.ValidationError,
rpc_interface.host_add_labels, 'host1', ['platform2'])
# make sure the platform didn't get added
platforms = rpc_interface.get_labels(
host__hostname__in=['host1', 'host2'], platform=True)
self.assertEquals(len(platforms), 1)
self.assertEquals(platforms[0]['name'], 'myplatform')
def _check_hostnames(self, hosts, expected_hostnames):
self.assertEquals(set(host['hostname'] for host in hosts),
set(expected_hostnames))
def test_get_hosts(self):
hosts = rpc_interface.get_hosts()
self._check_hostnames(hosts, [host.hostname for host in self.hosts])
hosts = rpc_interface.get_hosts(hostname='host1')
self._check_hostnames(hosts, ['host1'])
def test_get_hosts_multiple_labels(self):
hosts = rpc_interface.get_hosts(
multiple_labels=['myplatform', 'label1'])
self._check_hostnames(hosts, ['host1'])
def test_get_hosts_exclude_only_if_needed(self):
self.hosts[0].labels.add(self.label3)
hosts = rpc_interface.get_hosts(hostname__in=['host1', 'host2'],
exclude_only_if_needed_labels=True)
self._check_hostnames(hosts, ['host2'])
def test_get_jobs_summary(self):
job = self._create_job(hosts=xrange(1, 4))
entries = list(job.hostqueueentry_set.all())
entries[1].status = _hqe_status.FAILED
entries[1].save()
entries[2].status = _hqe_status.FAILED
entries[2].aborted = True
entries[2].save()
job_summaries = rpc_interface.get_jobs_summary(id=job.id)
self.assertEquals(len(job_summaries), 1)
summary = job_summaries[0]
self.assertEquals(summary['status_counts'], {'Queued': 1,
'Failed': 2})
def _create_job_helper(self, **kwargs):
return rpc_interface.create_job('test', 'Medium', 'control file',
'Server', **kwargs)
def test_one_time_hosts(self):
job = self._create_job_helper(one_time_hosts=['testhost'])
host = models.Host.objects.get(hostname='testhost')
self.assertEquals(host.invalid, True)
self.assertEquals(host.labels.count(), 0)
self.assertEquals(host.aclgroup_set.count(), 0)
def _setup_special_tasks(self):
host = self.hosts[0]
job1 = self._create_job(hosts=[1])
job2 = self._create_job(hosts=[1])
entry1 = job1.hostqueueentry_set.all()[0]
entry1.update_object(started_on=datetime.datetime(2009, 1, 2),
execution_subdir='1-myuser/host1')
entry2 = job2.hostqueueentry_set.all()[0]
entry2.update_object(started_on=datetime.datetime(2009, 1, 3),
execution_subdir='2-myuser/host1')
self.task1 = models.SpecialTask.objects.create(
host=host, task=models.SpecialTask.Task.VERIFY,
time_started=datetime.datetime(2009, 1, 1), # ran before job 1
is_complete=True)
self.task2 = models.SpecialTask.objects.create(
host=host, task=models.SpecialTask.Task.VERIFY,
queue_entry=entry2, # ran with job 2
is_active=True)
self.task3 = models.SpecialTask.objects.create(
host=host, task=models.SpecialTask.Task.VERIFY) # not yet run
def test_get_special_tasks(self):
self._setup_special_tasks()
tasks = rpc_interface.get_special_tasks(host__hostname='host1',
queue_entry__isnull=True)
self.assertEquals(len(tasks), 2)
self.assertEquals(tasks[0]['task'], models.SpecialTask.Task.VERIFY)
self.assertEquals(tasks[0]['is_active'], False)
self.assertEquals(tasks[0]['is_complete'], True)
def test_get_latest_special_task(self):
# a particular usage of get_special_tasks()
self._setup_special_tasks()
self.task2.time_started = datetime.datetime(2009, 1, 2)
self.task2.save()
tasks = rpc_interface.get_special_tasks(
host__hostname='host1', task=models.SpecialTask.Task.VERIFY,
time_started__isnull=False, sort_by=['-time_started'],
query_limit=1)
self.assertEquals(len(tasks), 1)
self.assertEquals(tasks[0]['id'], 2)
def _common_entry_check(self, entry_dict):
self.assertEquals(entry_dict['host']['hostname'], 'host1')
self.assertEquals(entry_dict['job']['id'], 2)
def test_get_host_queue_entries_and_special_tasks(self):
self._setup_special_tasks()
entries_and_tasks = (
rpc_interface.get_host_queue_entries_and_special_tasks('host1'))
paths = [entry['execution_path'] for entry in entries_and_tasks]
self.assertEquals(paths, ['hosts/host1/3-verify',
'2-myuser/host1',
'hosts/host1/2-verify',
'1-myuser/host1',
'hosts/host1/1-verify'])
verify2 = entries_and_tasks[2]
self._common_entry_check(verify2)
self.assertEquals(verify2['type'], 'Verify')
self.assertEquals(verify2['status'], 'Running')
self.assertEquals(verify2['execution_path'], 'hosts/host1/2-verify')
entry2 = entries_and_tasks[1]
self._common_entry_check(entry2)
self.assertEquals(entry2['type'], 'Job')
self.assertEquals(entry2['status'], 'Queued')
self.assertEquals(entry2['started_on'], '2009-01-03 00:00:00')
def _create_job_helper(self, **kwargs):
return rpc_interface.create_job('test', 'Medium', 'control file',
'Server', **kwargs)
if __name__ == '__main__':
unittest.main()