blob: 0958c81ea024a2442cfb662ab374e5b5d1a64302 [file] [log] [blame]
import atexit, datetime, os, tempfile, unittest
import common
from autotest_lib.frontend import setup_test_environment
from autotest_lib.frontend import thread_local
from autotest_lib.frontend.afe import models, model_attributes
from autotest_lib.client.common_lib import global_config
from autotest_lib.client.common_lib.test_utils import mock
class FrontendTestMixin(object):
def _fill_in_test_data(self):
"""Populate the test database with some hosts and labels."""
if models.DroneSet.drone_sets_enabled():
models.DroneSet.objects.create(
name=models.DroneSet.default_drone_set_name())
acl_group = models.AclGroup.objects.create(name='my_acl')
acl_group.users.add(models.User.current_user())
self.hosts = [models.Host.objects.create(hostname=hostname)
for hostname in
('host1', 'host2', 'host3', 'host4', 'host5', 'host6',
'host7', 'host8', 'host9')]
acl_group.hosts = self.hosts
models.AclGroup.smart_get('Everyone').hosts = []
self.labels = [models.Label.objects.create(name=name) for name in
('label1', 'label2', 'label3', 'label4', 'label5',
'label6', 'label7', 'label8', 'unused')]
platform = models.Label.objects.create(name='myplatform', platform=True)
for host in self.hosts:
host.labels.add(platform)
atomic_group1 = models.AtomicGroup.objects.create(
name='atomic1', max_number_of_machines=2)
atomic_group2 = models.AtomicGroup.objects.create(
name='atomic2', max_number_of_machines=2)
self.label3 = self.labels[2]
self.label3.only_if_needed = True
self.label3.save()
self.label4 = self.labels[3]
self.label4.atomic_group = atomic_group1
self.label4.save()
self.label5 = self.labels[4]
self.label5.atomic_group = atomic_group1
self.label5.save()
self.hosts[0].labels.add(self.labels[0]) # label1
self.hosts[1].labels.add(self.labels[1]) # label2
self.label6 = self.labels[5]
self.label7 = self.labels[6]
self.label8 = self.labels[7]
self.label8.atomic_group = atomic_group2
self.label8.save()
for hostnum in xrange(4,7): # host5..host7
self.hosts[hostnum].labels.add(self.label4) # an atomic group lavel
self.hosts[hostnum].labels.add(self.label6) # a normal label
self.hosts[6].labels.add(self.label7)
for hostnum in xrange(7,9): # host8..host9
self.hosts[hostnum].labels.add(self.label5) # an atomic group lavel
self.hosts[hostnum].labels.add(self.label6) # a normal label
self.hosts[hostnum].labels.add(self.label7)
def _frontend_common_setup(self, fill_data=True, setup_tables=True):
self.god = mock.mock_god(ut=self)
if setup_tables:
setup_test_environment.set_up()
global_config.global_config.override_config_value(
'AUTOTEST_WEB', 'parameterized_jobs', 'False')
global_config.global_config.override_config_value(
'SERVER', 'rpc_logging', 'False')
if fill_data and setup_tables:
self._fill_in_test_data()
def _frontend_common_teardown(self):
setup_test_environment.tear_down()
thread_local.set_user(None)
self.god.unstub_all()
def _create_job(self, hosts=[], metahosts=[], priority=0, active=False,
synchronous=False, atomic_group=None, hostless=False,
drone_set=None, control_file='control',
parameterized_job=None, owner='autotest_system',
parent_job_id=None):
"""
Create a job row in the test database.
@param hosts - A list of explicit host ids for this job to be
scheduled on.
@param metahosts - A list of label ids for each host that this job
should be scheduled on (meta host scheduling).
@param priority - The job priority (integer).
@param active - bool, mark this job as running or not in the database?
@param synchronous - bool, if True use synch_count=2 otherwise use
synch_count=1.
@param atomic_group - An atomic group id for this job to schedule on
or None if atomic scheduling is not required. Each metahost
becomes a request to schedule an entire atomic group.
This does not support creating an active atomic group job.
@param hostless - if True, this job is intended to be hostless (in that
case, hosts, metahosts, and atomic_group must all be empty)
@param owner - The owner of the job. Aclgroups from which a job can
acquire hosts change with the aclgroups of the owners.
@param parent_job_id - The id of a parent_job. If a job with the id
doesn't already exist one will be created.
@raises model.DoesNotExist: If parent_job_id is specified but a job with
id=parent_job_id does not exist.
@returns A Django frontend.afe.models.Job instance.
"""
if not drone_set:
drone_set = (models.DroneSet.default_drone_set_name()
and models.DroneSet.get_default())
assert not (atomic_group and active) # TODO(gps): support this
synch_count = synchronous and 2 or 1
created_on = datetime.datetime(2008, 1, 1)
status = models.HostQueueEntry.Status.QUEUED
if active:
status = models.HostQueueEntry.Status.RUNNING
parent_job = (models.Job.objects.get(id=parent_job_id)
if parent_job_id else None)
job = models.Job.objects.create(
name='test', owner=owner, priority=priority,
synch_count=synch_count, created_on=created_on,
reboot_before=model_attributes.RebootBefore.NEVER,
drone_set=drone_set, control_file=control_file,
parameterized_job=parameterized_job, parent_job=parent_job)
# Update the job's dependencies to include the metahost.
for metahost_label in metahosts:
dep = models.Label.objects.get(id=metahost_label)
job.dependency_labels.add(dep)
for host_id in hosts:
models.HostQueueEntry.objects.create(job=job, host_id=host_id,
status=status,
atomic_group_id=atomic_group)
models.IneligibleHostQueue.objects.create(job=job, host_id=host_id)
for label_id in metahosts:
models.HostQueueEntry.objects.create(job=job, meta_host_id=label_id,
status=status,
atomic_group_id=atomic_group)
if atomic_group and not (metahosts or hosts):
# Create a single HQE to request the atomic group of hosts even if
# no metahosts or hosts are supplied.
models.HostQueueEntry.objects.create(job=job,
status=status,
atomic_group_id=atomic_group)
if hostless:
assert not (hosts or metahosts or atomic_group)
models.HostQueueEntry.objects.create(job=job, status=status)
return job
def _create_job_simple(self, hosts, use_metahost=False,
priority=0, active=False, drone_set=None,
parent_job_id=None):
"""An alternative interface to _create_job"""
args = {'hosts' : [], 'metahosts' : []}
if use_metahost:
args['metahosts'] = hosts
else:
args['hosts'] = hosts
return self._create_job(
priority=priority, active=active, drone_set=drone_set,
parent_job_id=parent_job_id, **args)