blob: 9ba13cb9930b576ce05769827a8c82afa3e34b54 [file] [log] [blame]
#!/usr/bin/python
import datetime, unittest
import common
from autotest_lib.frontend import setup_django_environment
from autotest_lib.frontend.afe import frontend_test_utils
from django.db import connection
from autotest_lib.frontend.afe import models, rpc_interface, frontend_test_utils
from autotest_lib.frontend.afe import model_logic
_hqe_status = models.HostQueueEntry.Status
class RpcInterfaceTest(unittest.TestCase,
frontend_test_utils.FrontendTestMixin):
def setUp(self):
self._frontend_common_setup()
def tearDown(self):
self._frontend_common_teardown()
def test_validation(self):
# non-number for a numeric field
self.assertRaises(model_logic.ValidationError,
rpc_interface.add_atomic_group, name='foo',
max_number_of_machines='bar')
# omit a required field
self.assertRaises(model_logic.ValidationError, rpc_interface.add_label,
name=None)
# violate uniqueness constraint
self.assertRaises(model_logic.ValidationError, rpc_interface.add_host,
hostname='host1')
def test_multiple_platforms(self):
platform2 = models.Label.objects.create(name='platform2', platform=True)
self.assertRaises(model_logic.ValidationError,
rpc_interface. label_add_hosts, 'platform2',
['host1', 'host2'])
self.assertRaises(model_logic.ValidationError,
rpc_interface.host_add_labels, 'host1', ['platform2'])
# make sure the platform didn't get added
platforms = rpc_interface.get_labels(
host__hostname__in=['host1', 'host2'], platform=True)
self.assertEquals(len(platforms), 1)
self.assertEquals(platforms[0]['name'], 'myplatform')
def _check_hostnames(self, hosts, expected_hostnames):
self.assertEquals(set(host['hostname'] for host in hosts),
set(expected_hostnames))
def test_get_hosts(self):
hosts = rpc_interface.get_hosts()
self._check_hostnames(hosts, [host.hostname for host in self.hosts])
hosts = rpc_interface.get_hosts(hostname='host1')
self._check_hostnames(hosts, ['host1'])
host = hosts[0]
self.assertEquals(sorted(host['labels']), ['label1', 'myplatform'])
self.assertEquals(host['platform'], 'myplatform')
self.assertEquals(host['atomic_group'], None)
self.assertEquals(host['acls'], ['my_acl'])
self.assertEquals(host['attributes'], {})
def test_get_hosts_multiple_labels(self):
hosts = rpc_interface.get_hosts(
multiple_labels=['myplatform', 'label1'])
self._check_hostnames(hosts, ['host1'])
def test_get_hosts_exclude_only_if_needed(self):
self.hosts[0].labels.add(self.label3)
hosts = rpc_interface.get_hosts(hostname__in=['host1', 'host2'],
exclude_only_if_needed_labels=True)
self._check_hostnames(hosts, ['host2'])
def test_get_hosts_exclude_atomic_group_hosts(self):
hosts = rpc_interface.get_hosts(
exclude_atomic_group_hosts=True,
hostname__in=['host4', 'host5', 'host6'])
self._check_hostnames(hosts, ['host4'])
def test_get_hosts_exclude_both(self):
self.hosts[0].labels.add(self.label3)
hosts = rpc_interface.get_hosts(
hostname__in=['host1', 'host2', 'host5'],
exclude_only_if_needed_labels=True,
exclude_atomic_group_hosts=True)
self._check_hostnames(hosts, ['host2'])
def test_job_keyvals(self):
keyval_dict = {'mykey': 'myvalue'}
job_id = rpc_interface.create_job(name='test', priority='Medium',
control_file='foo',
control_type='Client',
hosts=['host1'],
keyvals=keyval_dict)
jobs = rpc_interface.get_jobs(id=job_id)
self.assertEquals(len(jobs), 1)
self.assertEquals(jobs[0]['keyvals'], keyval_dict)
def test_get_jobs_summary(self):
job = self._create_job(hosts=xrange(1, 4))
entries = list(job.hostqueueentry_set.all())
entries[1].status = _hqe_status.FAILED
entries[1].save()
entries[2].status = _hqe_status.FAILED
entries[2].aborted = True
entries[2].save()
job_summaries = rpc_interface.get_jobs_summary(id=job.id)
self.assertEquals(len(job_summaries), 1)
summary = job_summaries[0]
self.assertEquals(summary['status_counts'], {'Queued': 1,
'Failed': 2})
def test_get_jobs_filters(self):
HqeStatus = models.HostQueueEntry.Status
def create_two_host_job():
return self._create_job(hosts=[1, 2])
def set_hqe_statuses(job, first_status, second_status):
entries = job.hostqueueentry_set.all()
entries[0].update_object(status=first_status)
entries[1].update_object(status=second_status)
queued = create_two_host_job()
queued_and_running = create_two_host_job()
set_hqe_statuses(queued_and_running, HqeStatus.QUEUED,
HqeStatus.RUNNING)
running_and_complete = create_two_host_job()
set_hqe_statuses(running_and_complete, HqeStatus.RUNNING,
HqeStatus.COMPLETED)
complete = create_two_host_job()
set_hqe_statuses(complete, HqeStatus.COMPLETED, HqeStatus.COMPLETED)
started_but_inactive = create_two_host_job()
set_hqe_statuses(started_but_inactive, HqeStatus.QUEUED,
HqeStatus.COMPLETED)
parsing = create_two_host_job()
set_hqe_statuses(parsing, HqeStatus.PARSING, HqeStatus.PARSING)
def check_job_ids(actual_job_dicts, expected_jobs):
self.assertEquals(
set(job_dict['id'] for job_dict in actual_job_dicts),
set(job.id for job in expected_jobs))
check_job_ids(rpc_interface.get_jobs(not_yet_run=True), [queued])
check_job_ids(rpc_interface.get_jobs(running=True),
[queued_and_running, running_and_complete,
started_but_inactive, parsing])
check_job_ids(rpc_interface.get_jobs(finished=True), [complete])
def _create_job_helper(self, **kwargs):
return rpc_interface.create_job('test', 'Medium', 'control file',
'Server', **kwargs)
def test_one_time_hosts(self):
job = self._create_job_helper(one_time_hosts=['testhost'])
host = models.Host.objects.get(hostname='testhost')
self.assertEquals(host.invalid, True)
self.assertEquals(host.labels.count(), 0)
self.assertEquals(host.aclgroup_set.count(), 0)
def test_create_job_duplicate_hosts(self):
self.assertRaises(model_logic.ValidationError, self._create_job_helper,
hosts=[1, 1])
def test_create_hostless_job(self):
job_id = self._create_job_helper(hostless=True)
job = models.Job.objects.get(pk=job_id)
queue_entries = job.hostqueueentry_set.all()
self.assertEquals(len(queue_entries), 1)
self.assertEquals(queue_entries[0].host, None)
self.assertEquals(queue_entries[0].meta_host, None)
self.assertEquals(queue_entries[0].atomic_group, None)
def _setup_special_tasks(self):
host = self.hosts[0]
job1 = self._create_job(hosts=[1])
job2 = self._create_job(hosts=[1])
entry1 = job1.hostqueueentry_set.all()[0]
entry1.update_object(started_on=datetime.datetime(2009, 1, 2),
execution_subdir='host1')
entry2 = job2.hostqueueentry_set.all()[0]
entry2.update_object(started_on=datetime.datetime(2009, 1, 3),
execution_subdir='host1')
self.task1 = models.SpecialTask.objects.create(
host=host, task=models.SpecialTask.Task.VERIFY,
time_started=datetime.datetime(2009, 1, 1), # ran before job 1
is_complete=True)
self.task2 = models.SpecialTask.objects.create(
host=host, task=models.SpecialTask.Task.VERIFY,
queue_entry=entry2, # ran with job 2
is_active=True)
self.task3 = models.SpecialTask.objects.create(
host=host, task=models.SpecialTask.Task.VERIFY) # not yet run
def test_get_special_tasks(self):
self._setup_special_tasks()
tasks = rpc_interface.get_special_tasks(host__hostname='host1',
queue_entry__isnull=True)
self.assertEquals(len(tasks), 2)
self.assertEquals(tasks[0]['task'], models.SpecialTask.Task.VERIFY)
self.assertEquals(tasks[0]['is_active'], False)
self.assertEquals(tasks[0]['is_complete'], True)
def test_get_latest_special_task(self):
# a particular usage of get_special_tasks()
self._setup_special_tasks()
self.task2.time_started = datetime.datetime(2009, 1, 2)
self.task2.save()
tasks = rpc_interface.get_special_tasks(
host__hostname='host1', task=models.SpecialTask.Task.VERIFY,
time_started__isnull=False, sort_by=['-time_started'],
query_limit=1)
self.assertEquals(len(tasks), 1)
self.assertEquals(tasks[0]['id'], 2)
def _common_entry_check(self, entry_dict):
self.assertEquals(entry_dict['host']['hostname'], 'host1')
self.assertEquals(entry_dict['job']['id'], 2)
def test_get_host_queue_entries_and_special_tasks(self):
self._setup_special_tasks()
entries_and_tasks = (
rpc_interface.get_host_queue_entries_and_special_tasks('host1'))
paths = [entry['execution_path'] for entry in entries_and_tasks]
self.assertEquals(paths, ['hosts/host1/3-verify',
'2-autotest_system/host1',
'hosts/host1/2-verify',
'1-autotest_system/host1',
'hosts/host1/1-verify'])
verify2 = entries_and_tasks[2]
self._common_entry_check(verify2)
self.assertEquals(verify2['type'], 'Verify')
self.assertEquals(verify2['status'], 'Running')
self.assertEquals(verify2['execution_path'], 'hosts/host1/2-verify')
entry2 = entries_and_tasks[1]
self._common_entry_check(entry2)
self.assertEquals(entry2['type'], 'Job')
self.assertEquals(entry2['status'], 'Queued')
self.assertEquals(entry2['started_on'], '2009-01-03 00:00:00')
def test_view_invalid_host(self):
# RPCs used by View Host page should work for invalid hosts
self._create_job_helper(hosts=[1])
self.hosts[0].delete()
self.assertEquals(1, rpc_interface.get_num_hosts(hostname='host1',
valid_only=False))
data = rpc_interface.get_hosts(hostname='host1', valid_only=False)
self.assertEquals(1, len(data))
self.assertEquals(1, rpc_interface.get_num_host_queue_entries(
host__hostname='host1'))
data = rpc_interface.get_host_queue_entries(host__hostname='host1')
self.assertEquals(1, len(data))
count = rpc_interface.get_num_host_queue_entries_and_special_tasks(
hostname='host1')
self.assertEquals(1, count)
data = rpc_interface.get_host_queue_entries_and_special_tasks(
hostname='host1')
self.assertEquals(1, len(data))
def test_reverify_hosts(self):
hostname_list = rpc_interface.reverify_hosts(id__in=[1, 2])
self.assertEquals(hostname_list, ['host1', 'host2'])
tasks = rpc_interface.get_special_tasks()
self.assertEquals(len(tasks), 2)
self.assertEquals(set(task['host']['id'] for task in tasks),
set([1, 2]))
task = tasks[0]
self.assertEquals(task['task'], models.SpecialTask.Task.VERIFY)
self.assertEquals(task['requested_by'], 'autotest_system')
if __name__ == '__main__':
unittest.main()