| #!/usr/bin/python |
| #pylint: disable-msg=C0111 |
| |
| import gc, time |
| import common |
| from autotest_lib.frontend import setup_django_environment |
| from autotest_lib.frontend.afe import frontend_test_utils |
| from autotest_lib.client.common_lib.test_utils import mock |
| from autotest_lib.client.common_lib.test_utils import unittest |
| from autotest_lib.database import database_connection |
| from autotest_lib.frontend.afe import models |
| from autotest_lib.scheduler import agent_task |
| from autotest_lib.scheduler import monitor_db, drone_manager, email_manager |
| from autotest_lib.scheduler import pidfile_monitor |
| from autotest_lib.scheduler import scheduler_config, gc_stats, host_scheduler |
| from autotest_lib.scheduler import monitor_db_functional_test |
| from autotest_lib.scheduler import monitor_db_unittest |
| from autotest_lib.scheduler import scheduler_models |
| |
| _DEBUG = False |
| |
| |
| class AtomicGroupTest(monitor_db_unittest.DispatcherSchedulingTest): |
| |
| def test_atomic_group_hosts_blocked_from_non_atomic_jobs(self): |
| # Create a job scheduled to run on label6. |
| self._create_job(metahosts=[self.label6.id]) |
| self._run_scheduler() |
| # label6 only has hosts that are in atomic groups associated with it, |
| # there should be no scheduling. |
| self._check_for_extra_schedulings() |
| |
| |
| def test_atomic_group_hosts_blocked_from_non_atomic_jobs_explicit(self): |
| # Create a job scheduled to run on label5. This is an atomic group |
| # label but this job does not request atomic group scheduling. |
| self._create_job(metahosts=[self.label5.id]) |
| self._run_scheduler() |
| # label6 only has hosts that are in atomic groups associated with it, |
| # there should be no scheduling. |
| self._check_for_extra_schedulings() |
| |
| |
| def test_atomic_group_scheduling_basics(self): |
| # Create jobs scheduled to run on an atomic group. |
| job_a = self._create_job(synchronous=True, metahosts=[self.label4.id], |
| atomic_group=1) |
| job_b = self._create_job(synchronous=True, metahosts=[self.label5.id], |
| atomic_group=1) |
| self._run_scheduler() |
| # atomic_group.max_number_of_machines was 2 so we should run on 2. |
| self._assert_job_scheduled_on_number_of(job_a.id, (5, 6, 7), 2) |
| self._assert_job_scheduled_on(job_b.id, 8) # label5 |
| self._assert_job_scheduled_on(job_b.id, 9) # label5 |
| self._check_for_extra_schedulings() |
| |
| # The three host label4 atomic group still has one host available. |
| # That means a job with a synch_count of 1 asking to be scheduled on |
| # the atomic group can still use the final machine. |
| # |
| # This may seem like a somewhat odd use case. It allows the use of an |
| # atomic group as a set of machines to run smaller jobs within (a set |
| # of hosts configured for use in network tests with eachother perhaps?) |
| onehost_job = self._create_job(atomic_group=1) |
| self._run_scheduler() |
| self._assert_job_scheduled_on_number_of(onehost_job.id, (5, 6, 7), 1) |
| self._check_for_extra_schedulings() |
| |
| # No more atomic groups have hosts available, no more jobs should |
| # be scheduled. |
| self._create_job(atomic_group=1) |
| self._run_scheduler() |
| self._check_for_extra_schedulings() |
| |
| |
| def test_atomic_group_scheduling_obeys_acls(self): |
| # Request scheduling on a specific atomic label but be denied by ACLs. |
| self._do_query('DELETE FROM afe_acl_groups_hosts ' |
| 'WHERE host_id in (8,9)') |
| job = self._create_job(metahosts=[self.label5.id], atomic_group=1) |
| self._run_scheduler() |
| self._check_for_extra_schedulings() |
| |
| |
| def test_atomic_group_scheduling_dependency_label_exclude(self): |
| # A dependency label that matches no hosts in the atomic group. |
| job_a = self._create_job(atomic_group=1) |
| job_a.dependency_labels.add(self.label3) |
| self._run_scheduler() |
| self._check_for_extra_schedulings() |
| |
| |
| def test_atomic_group_scheduling_metahost_dependency_label_exclude(self): |
| # A metahost and dependency label that excludes too many hosts. |
| job_b = self._create_job(synchronous=True, metahosts=[self.label4.id], |
| atomic_group=1) |
| job_b.dependency_labels.add(self.label7) |
| self._run_scheduler() |
| self._check_for_extra_schedulings() |
| |
| |
| def test_atomic_group_scheduling_dependency_label_match(self): |
| # A dependency label that exists on enough atomic group hosts in only |
| # one of the two atomic group labels. |
| job_c = self._create_job(synchronous=True, atomic_group=1) |
| job_c.dependency_labels.add(self.label7) |
| self._run_scheduler() |
| self._assert_job_scheduled_on_number_of(job_c.id, (8, 9), 2) |
| self._check_for_extra_schedulings() |
| |
| |
| def test_atomic_group_scheduling_no_metahost(self): |
| # Force it to schedule on the other group for a reliable test. |
| self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=9') |
| # An atomic job without a metahost. |
| job = self._create_job(synchronous=True, atomic_group=1) |
| self._run_scheduler() |
| self._assert_job_scheduled_on_number_of(job.id, (5, 6, 7), 2) |
| self._check_for_extra_schedulings() |
| |
| |
| def test_atomic_group_scheduling_partial_group(self): |
| # Make one host in labels[3] unavailable so that there are only two |
| # hosts left in the group. |
| self._do_query('UPDATE afe_hosts SET status="Repair Failed" WHERE id=5') |
| job = self._create_job(synchronous=True, metahosts=[self.label4.id], |
| atomic_group=1) |
| self._run_scheduler() |
| # Verify that it was scheduled on the 2 ready hosts in that group. |
| self._assert_job_scheduled_on(job.id, 6) |
| self._assert_job_scheduled_on(job.id, 7) |
| self._check_for_extra_schedulings() |
| |
| |
| def test_atomic_group_scheduling_not_enough_available(self): |
| # Mark some hosts in each atomic group label as not usable. |
| # One host running, another invalid in the first group label. |
| self._do_query('UPDATE afe_hosts SET status="Running" WHERE id=5') |
| self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=6') |
| # One host invalid in the second group label. |
| self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=9') |
| # Nothing to schedule when no group label has enough (2) good hosts.. |
| self._create_job(atomic_group=1, synchronous=True) |
| self._run_scheduler() |
| # There are not enough hosts in either atomic group, |
| # No more scheduling should occur. |
| self._check_for_extra_schedulings() |
| |
| # Now create an atomic job that has a synch count of 1. It should |
| # schedule on exactly one of the hosts. |
| onehost_job = self._create_job(atomic_group=1) |
| self._run_scheduler() |
| self._assert_job_scheduled_on_number_of(onehost_job.id, (7, 8), 1) |
| |
| |
| def test_atomic_group_scheduling_no_valid_hosts(self): |
| self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id in (8,9)') |
| self._create_job(synchronous=True, metahosts=[self.label5.id], |
| atomic_group=1) |
| self._run_scheduler() |
| # no hosts in the selected group and label are valid. no schedulings. |
| self._check_for_extra_schedulings() |
| |
| |
| def test_atomic_group_scheduling_metahost_works(self): |
| # Test that atomic group scheduling also obeys metahosts. |
| self._create_job(metahosts=[0], atomic_group=1) |
| self._run_scheduler() |
| # There are no atomic group hosts that also have that metahost. |
| self._check_for_extra_schedulings() |
| |
| job_b = self._create_job(metahosts=[self.label5.id], atomic_group=1) |
| self._run_scheduler() |
| self._assert_job_scheduled_on(job_b.id, 8) |
| self._assert_job_scheduled_on(job_b.id, 9) |
| self._check_for_extra_schedulings() |
| |
| |
| def test_atomic_group_skips_ineligible_hosts(self): |
| # Test hosts marked ineligible for this job are not eligible. |
| # How would this ever happen anyways? |
| job = self._create_job(metahosts=[self.label4.id], atomic_group=1) |
| models.IneligibleHostQueue.objects.create(job=job, host_id=5) |
| models.IneligibleHostQueue.objects.create(job=job, host_id=6) |
| models.IneligibleHostQueue.objects.create(job=job, host_id=7) |
| self._run_scheduler() |
| # No scheduling should occur as all desired hosts were ineligible. |
| self._check_for_extra_schedulings() |
| |
| |
| def test_atomic_group_scheduling_fail(self): |
| # If synch_count is > the atomic group number of machines, the job |
| # should be aborted immediately. |
| model_job = self._create_job(synchronous=True, atomic_group=1) |
| model_job.synch_count = 4 |
| model_job.save() |
| job = scheduler_models.Job(id=model_job.id) |
| self._run_scheduler() |
| self._check_for_extra_schedulings() |
| queue_entries = job.get_host_queue_entries() |
| self.assertEqual(1, len(queue_entries)) |
| self.assertEqual(queue_entries[0].status, |
| models.HostQueueEntry.Status.ABORTED) |
| |
| |
| def test_atomic_group_no_labels_no_scheduling(self): |
| # Never schedule on atomic groups marked invalid. |
| job = self._create_job(metahosts=[self.label5.id], synchronous=True, |
| atomic_group=1) |
| # Deleting an atomic group via the frontend marks it invalid and |
| # removes all label references to the group. The job now references |
| # an invalid atomic group with no labels associated with it. |
| self.label5.atomic_group.invalid = True |
| self.label5.atomic_group.save() |
| self.label5.atomic_group = None |
| self.label5.save() |
| |
| self._run_scheduler() |
| self._check_for_extra_schedulings() |
| |
| |
| def test_schedule_directly_on_atomic_group_host_fail(self): |
| # Scheduling a job directly on hosts in an atomic group must |
| # fail to avoid users inadvertently holding up the use of an |
| # entire atomic group by using the machines individually. |
| job = self._create_job(hosts=[5]) |
| self._run_scheduler() |
| self._check_for_extra_schedulings() |
| |
| |
| def test_schedule_directly_on_atomic_group_host(self): |
| # Scheduling a job directly on one host in an atomic group will |
| # work when the atomic group is listed on the HQE in addition |
| # to the host (assuming the sync count is 1). |
| job = self._create_job(hosts=[5], atomic_group=1) |
| self._run_scheduler() |
| self._assert_job_scheduled_on(job.id, 5) |
| self._check_for_extra_schedulings() |
| |
| |
| def test_schedule_directly_on_atomic_group_hosts_sync2(self): |
| job = self._create_job(hosts=[5,8], atomic_group=1, synchronous=True) |
| self._run_scheduler() |
| self._assert_job_scheduled_on(job.id, 5) |
| self._assert_job_scheduled_on(job.id, 8) |
| self._check_for_extra_schedulings() |
| |
| |
| def test_schedule_directly_on_atomic_group_hosts_wrong_group(self): |
| job = self._create_job(hosts=[5,8], atomic_group=2, synchronous=True) |
| self._run_scheduler() |
| self._check_for_extra_schedulings() |
| |
| |
| # TODO(gps): These should probably live in their own TestCase class |
| # specific to testing HostScheduler methods directly. It was convenient |
| # to put it here for now to share existing test environment setup code. |
| def test_HostScheduler_check_atomic_group_labels(self): |
| normal_job = self._create_job(metahosts=[0]) |
| atomic_job = self._create_job(atomic_group=1) |
| # Indirectly initialize the internal state of the host scheduler. |
| self._dispatcher._refresh_pending_queue_entries() |
| |
| atomic_hqe = scheduler_models.HostQueueEntry.fetch(where='job_id=%d' % |
| atomic_job.id)[0] |
| normal_hqe = scheduler_models.HostQueueEntry.fetch(where='job_id=%d' % |
| normal_job.id)[0] |
| |
| host_scheduler = self._dispatcher._host_scheduler |
| self.assertTrue(host_scheduler._check_atomic_group_labels( |
| [self.label4.id], atomic_hqe)) |
| self.assertFalse(host_scheduler._check_atomic_group_labels( |
| [self.label4.id], normal_hqe)) |
| self.assertFalse(host_scheduler._check_atomic_group_labels( |
| [self.label5.id, self.label6.id, self.label7.id], normal_hqe)) |
| self.assertTrue(host_scheduler._check_atomic_group_labels( |
| [self.label4.id, self.label6.id], atomic_hqe)) |
| self.assertTrue(host_scheduler._check_atomic_group_labels( |
| [self.label4.id, self.label5.id], |
| atomic_hqe)) |
| |
| |
| class OnlyIfNeededTest(monitor_db_unittest.DispatcherSchedulingTest): |
| |
| def _setup_test_only_if_needed_labels(self): |
| # apply only_if_needed label3 to host1 |
| models.Host.smart_get('host1').labels.add(self.label3) |
| return self._create_job_simple([1], use_metahost=True) |
| |
| |
| def test_only_if_needed_labels_avoids_host(self): |
| job = self._setup_test_only_if_needed_labels() |
| # if the job doesn't depend on label3, there should be no scheduling |
| self._run_scheduler() |
| self._check_for_extra_schedulings() |
| |
| |
| def test_only_if_needed_labels_schedules(self): |
| job = self._setup_test_only_if_needed_labels() |
| job.dependency_labels.add(self.label3) |
| self._run_scheduler() |
| self._assert_job_scheduled_on(1, 1) |
| self._check_for_extra_schedulings() |
| |
| |
| def test_only_if_needed_labels_via_metahost(self): |
| job = self._setup_test_only_if_needed_labels() |
| job.dependency_labels.add(self.label3) |
| # should also work if the metahost is the only_if_needed label |
| self._do_query('DELETE FROM afe_jobs_dependency_labels') |
| self._create_job(metahosts=[3]) |
| self._run_scheduler() |
| self._assert_job_scheduled_on(2, 1) |
| self._check_for_extra_schedulings() |
| |
| |
| def test_metahosts_obey_blocks(self): |
| """ |
| Metahosts can't get scheduled on hosts already scheduled for |
| that job. |
| """ |
| self._create_job(metahosts=[1], hosts=[1]) |
| # make the nonmetahost entry complete, so the metahost can try |
| # to get scheduled |
| self._update_hqe(set='complete = 1', where='host_id=1') |
| self._run_scheduler() |
| self._check_for_extra_schedulings() |
| |
| |