[autotest] Split host acquisition and job scheduling.
This is phase one of two in the plan to split host acquisition out of the
scheduler's tick. The idea is to have the host scheduler use a job query
manager to query the database for new jobs without hosts and assign
hosts to them, while the main scheduler uses the same query managers to
look for hostless jobs.
Currently the main scheduler uses the class to acquire hosts inline,
like it always has, and will continue to do so till the
inline_host_acquisition feature flag is turned on via the shadow_config.
TEST=Ran the scheduler, suites, unittets.
BUG=chromium:344613
DEPLOY=Scheduler
Change-Id: I542e4d1e509c16cac7354810416ee18ac940a7cf
Reviewed-on: https://chromium-review.googlesource.com/199383
Reviewed-by: Prashanth B <beeps@chromium.org>
Commit-Queue: Prashanth B <beeps@chromium.org>
Tested-by: Prashanth B <beeps@chromium.org>
diff --git a/scheduler/host_scheduler_unittests.py b/scheduler/host_scheduler_unittests.py
new file mode 100644
index 0000000..3c7637b
--- /dev/null
+++ b/scheduler/host_scheduler_unittests.py
@@ -0,0 +1,150 @@
+#!/usr/bin/python
+#pylint: disable-msg=C0111
+
+# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import mock
+
+import common
+
+from autotest_lib.client.common_lib.test_utils import unittest
+from autotest_lib.frontend import setup_django_environment
+from autotest_lib.frontend.afe import frontend_test_utils
+from autotest_lib.frontend.afe import models
+from autotest_lib.scheduler import rdb
+from autotest_lib.scheduler import rdb_lib
+from autotest_lib.scheduler import rdb_testing_utils
+
+
+class QueryManagerTests(rdb_testing_utils.AbstractBaseRDBTester,
+ unittest.TestCase):
+ """Verify scheduler behavior when pending jobs are already given hosts."""
+
+ _config_section = 'AUTOTEST_WEB'
+
+
+ def testPendingQueueEntries(self):
+ """Test retrieval of pending queue entries."""
+ job = self.create_job(deps=set(['a']))
+
+ # Check that we don't pull the job we just created with only_hostless.
+ jobs_with_hosts = self.job_query_manager.get_pending_queue_entries(
+ only_hostless=True)
+ self.assertTrue(len(jobs_with_hosts) == 0)
+
+ # Check that only_hostless=False pulls new jobs, as always.
+ jobs_without_hosts = self.job_query_manager.get_pending_queue_entries(
+ only_hostless=False)
+ self.assertTrue(jobs_without_hosts[0].id == job.id and
+ jobs_without_hosts[0].host_id is None)
+
+
+ def testHostQueries(self):
+ """Verify that the host query manager maintains its data structures."""
+ # Create a job and use the host_query_managers internal datastructures
+ # to retrieve its job info.
+ job = self.create_job(
+ deps=rdb_testing_utils.DEFAULT_DEPS,
+ acls=rdb_testing_utils.DEFAULT_ACLS)
+ queue_entries = self._dispatcher._refresh_pending_queue_entries()
+ job_manager = rdb_lib.JobQueryManager(queue_entries)
+ job_info = job_manager.get_job_info(queue_entries[0])
+ default_dep_ids = set([label.id for label in self.db_helper.get_labels(
+ name__in=rdb_testing_utils.DEFAULT_DEPS)])
+ default_acl_ids = set([acl.id for acl in self.db_helper.get_acls(
+ name__in=rdb_testing_utils.DEFAULT_ACLS)])
+ self.assertTrue(set(job_info['deps']) == default_dep_ids)
+ self.assertTrue(set(job_info['acls']) == default_acl_ids)
+
+
+ def testNewJobsWithHosts(self):
+ """Test that we handle inactive hqes with unleased hosts correctly."""
+ # Create a job and assign it an unleased host, then check that the
+ # HQE becomes active and the host remains assigned to it.
+ job = self.create_job(deps=['a'])
+ host = self.db_helper.create_host('h1', deps=['a'])
+ self.db_helper.add_host_to_job(host, job.id)
+
+ queue_entries = self._dispatcher._refresh_pending_queue_entries()
+ self._dispatcher._schedule_new_jobs()
+
+ host = self.db_helper.get_host(hostname='h1')[0]
+ self.assertTrue(host.leased == True and
+ host.status == models.Host.Status.READY)
+ hqes = list(self.db_helper.get_hqes(host_id=host.id))
+ self.assertTrue(len(hqes) == 1 and hqes[0].active and
+ hqes[0].status == models.HostQueueEntry.Status.QUEUED)
+
+
+ def testNewJobsWithInvalidHost(self):
+ """Test handling of inactive hqes assigned invalid, unleased hosts."""
+ # Create a job and assign it an unleased host, then check that the
+ # HQE becomes DOES NOT become active, because we validate the
+ # assignment again.
+ job = self.create_job(deps=['a'])
+ host = self.db_helper.create_host('h1', deps=['b'])
+ self.db_helper.add_host_to_job(host, job.id)
+
+ queue_entries = self._dispatcher._refresh_pending_queue_entries()
+ self._dispatcher._schedule_new_jobs()
+
+ host = self.db_helper.get_host(hostname='h1')[0]
+ self.assertTrue(host.leased == False and
+ host.status == models.Host.Status.READY)
+ hqes = list(self.db_helper.get_hqes(host_id=host.id))
+ self.assertTrue(len(hqes) == 1 and not hqes[0].active and
+ hqes[0].status == models.HostQueueEntry.Status.QUEUED)
+
+
+ def testNewJobsWithLeasedHost(self):
+ """Test handling of inactive hqes assigned leased hosts."""
+ # Create a job and assign it a leased host, then check that the
+ # HQE does not become active through the scheduler, and that the
+ # host gets released.
+ job = self.create_job(deps=['a'])
+ host = self.db_helper.create_host('h1', deps=['b'])
+ self.db_helper.add_host_to_job(host, job.id)
+ host.leased = 1
+ host.save()
+
+ rdb.batch_acquire_hosts = mock.MagicMock()
+ queue_entries = self._dispatcher._refresh_pending_queue_entries()
+ self._dispatcher._schedule_new_jobs()
+ self.assertTrue(rdb.batch_acquire_hosts.call_count == 0)
+ host = self.db_helper.get_host(hostname='h1')[0]
+ self.assertTrue(host.leased == True and
+ host.status == models.Host.Status.READY)
+ hqes = list(self.db_helper.get_hqes(host_id=host.id))
+ self.assertTrue(len(hqes) == 1 and not hqes[0].active and
+ hqes[0].status == models.HostQueueEntry.Status.QUEUED)
+ self.host_scheduler._release_hosts()
+ self.assertTrue(self.db_helper.get_host(hostname='h1')[0].leased == 0)
+
+
+ def testSpecialTaskOrdering(self):
+ """Test priority ordering of special tasks."""
+
+ # Create 2 special tasks, one with and one without an hqe.
+ # Then assign the same host to another active hqe and make
+ # sure we don't try scheduling either of these special tasks.
+ host = self.db_helper.create_host('h1', deps=['a'])
+ task1 = self.db_helper.create_special_task(host_id=host.id)
+ job = self.create_job(deps=['a'])
+ self.db_helper.add_host_to_job(host, job.id)
+ hqe = self.db_helper.get_hqes(job=job.id)[0]
+ task2 = self.db_helper.create_special_task(job.id)
+ tasks = self.job_query_manager.get_prioritized_special_tasks()
+ self.assertTrue(tasks[0].queue_entry_id is None and
+ tasks[1].queue_entry_id == hqe.id)
+
+ job2 = self.create_job(deps=['a'])
+ self.db_helper.add_host_to_job(host, job2.id)
+ hqe2 = self.db_helper.get_hqes(job=job2.id)[0]
+ hqe2.status = models.HostQueueEntry.Status.RUNNING
+ hqe2.save()
+ tasks = self.job_query_manager.get_prioritized_special_tasks()
+ self.assertTrue(tasks == [])
+
+