First iteration of pluggable metahost handlers.  This change adds the basic framework and moves the default, label-based metahost assignment code into a handler.  It includes some refactorings to the basic scheduling code to make things a bit cleaner.

Signed-off-by: Steve Howard <showard@google.com>


git-svn-id: http://test.kernel.org/svn/autotest/trunk@4232 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/scheduler/metahost_scheduler.py b/scheduler/metahost_scheduler.py
new file mode 100644
index 0000000..9588e95
--- /dev/null
+++ b/scheduler/metahost_scheduler.py
@@ -0,0 +1,108 @@
+from autotest_lib.client.common_lib import utils
+
+class HostSchedulingUtility(object):
+    """Interface to host availability information from the scheduler."""
+    def hosts_in_label(self, label_id):
+        """Return potentially usable hosts with the given label."""
+        raise NotImplementedError
+
+
+    def remove_host_from_label(self, host_id, label_id):
+        """Remove this host from the internal list of usable hosts in the label.
+
+        This is provided as an optimization -- when code gets a host from a
+        label and concludes it's unusable, it can call this to avoid getting
+        that host again in the future (within this tick).  This function should
+        not affect correctness.
+        """
+        raise NotImplementedError
+
+
+    def pop_host(self, host_id):
+        """Remove and return a host from the internal list of available hosts.
+        """
+        raise NotImplementedError
+
+
+    def ineligible_hosts_for_entry(self, queue_entry):
+        """Get the list of hosts ineligible to run the given queue entry."""
+        raise NotImplementedError
+
+
+    def is_host_usable(self, host_id):
+        """Determine if the host is currently usable at all."""
+        raise NotImplementedError
+
+
+    def is_host_eligible_for_job(self, host_id, queue_entry):
+        """Determine if the host is eligible specifically for this queue entry.
+
+        @param queue_entry: a HostQueueEntry DBObject
+        """
+        raise NotImplementedError
+
+
+class MetahostScheduler(object):
+    def can_schedule_metahost(self, queue_entry):
+        """Return true if this object can schedule the given queue entry.
+
+        At most one MetahostScheduler should return true for any given entry.
+
+        @param queue_entry: a HostQueueEntry DBObject
+        """
+        raise NotImplementedError
+
+
+    def schedule_metahost(self, queue_entry, scheduling_utility):
+         """Schedule the given queue entry, if possible.
+
+         This method should make necessary database changes culminating in
+         assigning a host to the given queue entry in the database.  It may
+         take no action if no host can be assigned currently.
+
+         @param queue_entry: a HostQueueEntry DBObject
+         @param scheduling_utility: a HostSchedulingUtility object
+         """
+         raise NotImplementedError
+
+
+    def recovery_on_startup(self):
+        """Perform any necessary recovery upon scheduler startup."""
+        pass
+
+
+    def tick(self):
+        """Called once per scheduler cycle; any actions are allowed."""
+        pass
+
+
+class LabelMetahostScheduler(MetahostScheduler):
+    def can_schedule_metahost(self, queue_entry):
+        return bool(queue_entry.meta_host)
+
+
+    def schedule_metahost(self, queue_entry, scheduling_utility):
+        label_id = queue_entry.meta_host
+        hosts_in_label = scheduling_utility.hosts_in_label(label_id)
+        ineligible_host_ids = scheduling_utility.ineligible_hosts_for_entry(
+                queue_entry)
+
+        for host_id in hosts_in_label:
+            if not scheduling_utility.is_host_usable(host_id):
+                scheduling_utility.remove_host_from_label(host_id, label_id)
+                continue
+            if host_id in ineligible_host_ids:
+                continue
+            if not scheduling_utility.is_host_eligible_for_job(host_id,
+                                                               queue_entry):
+                continue
+
+            # Remove the host from our cached internal state before returning
+            scheduling_utility.remove_host_from_label(host_id, label_id)
+            host = scheduling_utility.pop_host(host_id)
+            queue_entry.set_host(host)
+            return
+
+
+def get_metahost_schedulers():
+    return [LabelMetahostScheduler()]
diff --git a/scheduler/metahost_scheduler_unittest.py b/scheduler/metahost_scheduler_unittest.py
new file mode 100644
index 0000000..5933769
--- /dev/null
+++ b/scheduler/metahost_scheduler_unittest.py
@@ -0,0 +1,74 @@
+#!/usr/bin/python
+
+import common
+import unittest
+from autotest_lib.client.common_lib.test_utils import mock
+from autotest_lib.scheduler import metahost_scheduler, monitor_db
+
+class LabelMetahostSchedulerTest(unittest.TestCase):
+    def setUp(self):
+        self.god = mock.mock_god()
+        self.scheduling_utility = self.god.create_mock_class(
+                metahost_scheduler.HostSchedulingUtility, 'utility')
+        self.metahost_scheduler = metahost_scheduler.LabelMetahostScheduler()
+
+
+    def entry(self):
+        return self.god.create_mock_class(monitor_db.HostQueueEntry, 'entry')
+
+
+    def test_can_schedule_metahost(self):
+        entry = self.entry()
+        entry.meta_host = None
+        self.assertFalse(self.metahost_scheduler.can_schedule_metahost(entry))
+
+        entry.meta_host = 1
+        self.assert_(self.metahost_scheduler.can_schedule_metahost(entry))
+
+
+    def test_schedule_metahost(self):
+        entry = self.entry()
+        entry.meta_host = 1
+        host = object()
+
+        self.scheduling_utility.hosts_in_label.expect_call(1).and_return(
+                [2, 3, 4, 5])
+        # 2 is in ineligible_hosts
+        (self.scheduling_utility.ineligible_hosts_for_entry.expect_call(entry)
+         .and_return([2]))
+        self.scheduling_utility.is_host_usable.expect_call(2).and_return(True)
+        # 3 is unusable
+        self.scheduling_utility.is_host_usable.expect_call(3).and_return(False)
+        self.scheduling_utility.remove_host_from_label.expect_call(3, 1)
+        # 4 is ineligible for the job
+        self.scheduling_utility.is_host_usable.expect_call(4).and_return(True)
+        (self.scheduling_utility.is_host_eligible_for_job.expect_call(4, entry)
+         .and_return(False))
+        # 5 runs
+        self.scheduling_utility.is_host_usable.expect_call(5).and_return(True)
+        (self.scheduling_utility.is_host_eligible_for_job.expect_call(5, entry)
+         .and_return(True))
+        self.scheduling_utility.remove_host_from_label.expect_call(5, 1)
+        self.scheduling_utility.pop_host.expect_call(5).and_return(host)
+        entry.set_host.expect_call(host)
+
+        self.metahost_scheduler.schedule_metahost(entry,
+                                                  self.scheduling_utility)
+        self.god.check_playback()
+
+
+    def test_no_hosts(self):
+        entry = self.entry()
+        entry.meta_host = 1
+
+        self.scheduling_utility.hosts_in_label.expect_call(1).and_return(())
+        (self.scheduling_utility.ineligible_hosts_for_entry.expect_call(entry)
+         .and_return(()))
+
+        self.metahost_scheduler.schedule_metahost(entry,
+                                                  self.scheduling_utility)
+        self.god.check_playback()
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index eb21a56..464ad5e 100755
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -24,7 +24,7 @@
 from autotest_lib.scheduler import drone_manager, drones, email_manager
 from autotest_lib.scheduler import monitor_db_cleanup
 from autotest_lib.scheduler import status_server, scheduler_config
-from autotest_lib.scheduler import gc_stats
+from autotest_lib.scheduler import gc_stats, metahost_scheduler
 
 BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
 PID_FILE_PREFIX = 'monitor_db'
@@ -78,6 +78,11 @@
     return {}
 
 
+get_metahost_schedulers = utils.import_site_function(
+        __file__, 'autotest_lib.scheduler.site_metahost_scheduler',
+        'get_metahost_schedulers', metahost_scheduler.get_metahost_schedulers)
+
+
 def main():
     try:
         try:
@@ -260,7 +265,22 @@
     """Raised by HostScheduler when an inconsistent state occurs."""
 
 
-class HostScheduler(object):
+class HostScheduler(metahost_scheduler.HostSchedulingUtility):
+    """Handles the logic for choosing when to run jobs and on which hosts.
+
+    This class makes several queries to the database on each tick, building up
+    some auxiliary data structures and using them to determine which hosts are
+    eligible to run which jobs, taking into account all the various factors that
+    affect that.
+
+    In the past this was done with one or two very large, complex database
+    queries.  It has proven much simpler and faster to build these auxiliary
+    data structures and perform the logic in Python.
+    """
+    def __init__(self):
+        self._metahost_schedulers = get_metahost_schedulers()
+
+
     def _get_ready_hosts(self):
         # avoid any host with a currently active queue entry against it
         hosts = Host.fetch(
@@ -378,6 +398,32 @@
         self._labels = self._get_labels()
 
 
+    def hosts_in_label(self, label_id):
+        """Return potentially usable hosts with the given label."""
+        return set(self._label_hosts.get(label_id, ()))
+
+
+    def remove_host_from_label(self, host_id, label_id):
+        """Remove this host from the internal list of usable hosts in the label.
+
+        This is provided as an optimization -- when code gets a host from a
+        label and concludes it's unusable, it can call this to avoid getting
+        that host again in the future (within this tick).
+        """
+        self._label_hosts[label_id].remove(host_id)
+
+
+    def pop_host(self, host_id):
+        """Remove and return a host from the internal list of available hosts.
+        """
+        return self._hosts_available.pop(host_id)
+
+
+    def ineligible_hosts_for_entry(self, queue_entry):
+        """Get the list of hosts ineligible to run the given queue entry."""
+        return set(self._ineligible_hosts.get(queue_entry.job_id, ()))
+
+
     def _is_acl_accessible(self, host_id, queue_entry):
         job_acls = self._job_acls.get(queue_entry.job_id, set())
         host_acls = self._host_acls.get(host_id, set())
@@ -470,11 +516,11 @@
                 supplied queue_entry.
         """
         return set(host_id for host_id in group_hosts
-                   if self._is_host_usable(host_id)
-                   and self._is_host_eligible_for_job(host_id, queue_entry))
+                   if self.is_host_usable(host_id)
+                   and self.is_host_eligible_for_job(host_id, queue_entry))
 
 
-    def _is_host_eligible_for_job(self, host_id, queue_entry):
+    def is_host_eligible_for_job(self, host_id, queue_entry):
         if self._is_host_invalid(host_id):
             # if an invalid host is scheduled for a job, it's a one-time host
             # and it therefore bypasses eligibility checks. note this can only
@@ -498,12 +544,12 @@
 
 
     def _schedule_non_metahost(self, queue_entry):
-        if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
+        if not self.is_host_eligible_for_job(queue_entry.host_id, queue_entry):
             return None
         return self._hosts_available.pop(queue_entry.host_id, None)
 
 
-    def _is_host_usable(self, host_id):
+    def is_host_usable(self, host_id):
         if host_id not in self._hosts_available:
             # host was already used during this scheduling cycle
             return False
@@ -514,35 +560,16 @@
         return True
 
 
-    def _schedule_metahost(self, queue_entry):
-        label_id = queue_entry.meta_host
-        hosts_in_label = self._label_hosts.get(label_id, set())
-        ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
-                                                         set())
-
-        # must iterate over a copy so we can mutate the original while iterating
-        for host_id in list(hosts_in_label):
-            if not self._is_host_usable(host_id):
-                hosts_in_label.remove(host_id)
-                continue
-            if host_id in ineligible_host_ids:
-                continue
-            if not self._is_host_eligible_for_job(host_id, queue_entry):
-                continue
-
-            # Remove the host from our cached internal state before returning
-            # the host object.
-            hosts_in_label.remove(host_id)
-            return self._hosts_available.pop(host_id)
-        return None
-
-
-    def find_eligible_host(self, queue_entry):
-        if not queue_entry.meta_host:
-            assert queue_entry.host_id is not None
+    def schedule_entry(self, queue_entry):
+        if queue_entry.host_id is not None:
             return self._schedule_non_metahost(queue_entry)
-        assert queue_entry.atomic_group_id is None
-        return self._schedule_metahost(queue_entry)
+
+        for scheduler in self._metahost_schedulers:
+            if scheduler.can_schedule_metahost(queue_entry):
+                scheduler.schedule_metahost(queue_entry, self)
+                return None
+
+        raise SchedulerError('No metahost scheduler to handle %s' % queue_entry)
 
 
     def find_eligible_atomic_group(self, queue_entry):
@@ -580,14 +607,13 @@
                 job.id, job.synch_count, atomic_group.id,
                  atomic_group.max_number_of_machines, queue_entry.id)
             return []
-        hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
-        ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
-                                                         set())
+        hosts_in_label = self.hosts_in_label(queue_entry.meta_host)
+        ineligible_host_ids = self.ineligible_hosts_for_entry(queue_entry)
 
         # Look in each label associated with atomic_group until we find one with
         # enough hosts to satisfy the job.
         for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
-            group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
+            group_hosts = set(self.hosts_in_label(atomic_label_id))
             if queue_entry.meta_host is not None:
                 # If we have a metahost label, only allow its hosts.
                 group_hosts.intersection_update(hosts_in_label)
@@ -1041,17 +1067,17 @@
         logging.info('Expanding atomic group entry %s with hosts %s',
                      queue_entry,
                      ', '.join(host.hostname for host in group_hosts))
-        # The first assigned host uses the original HostQueueEntry
-        group_queue_entries = [queue_entry]
+
         for assigned_host in group_hosts[1:]:
             # Create a new HQE for every additional assigned_host.
             new_hqe = HostQueueEntry.clone(queue_entry)
             new_hqe.save()
-            group_queue_entries.append(new_hqe)
-        assert len(group_queue_entries) == len(group_hosts)
-        for queue_entry, host in itertools.izip(group_queue_entries,
-                                                group_hosts):
-            self._run_queue_entry(queue_entry, host)
+            new_hqe.set_host(assigned_host)
+            self._run_queue_entry(new_hqe)
+
+        # The first assigned host uses the original HostQueueEntry
+        queue_entry.set_host(group_hosts[0])
+        self._run_queue_entry(queue_entry)
 
 
     def _schedule_hostless_job(self, queue_entry):
@@ -1067,15 +1093,16 @@
             is_unassigned_atomic_group = (
                     queue_entry.atomic_group_id is not None
                     and queue_entry.host_id is None)
-            if is_unassigned_atomic_group:
-                self._schedule_atomic_group(queue_entry)
-            elif queue_entry.is_hostless():
+
+            if queue_entry.is_hostless():
                 self._schedule_hostless_job(queue_entry)
+            elif is_unassigned_atomic_group:
+                self._schedule_atomic_group(queue_entry)
             else:
-                assigned_host = self._host_scheduler.find_eligible_host(
-                        queue_entry)
+                assigned_host = self._host_scheduler.schedule_entry(queue_entry)
                 if assigned_host and not self.host_has_agent(assigned_host):
-                    self._run_queue_entry(queue_entry, assigned_host)
+                    assert assigned_host.id == queue_entry.host_id
+                    self._run_queue_entry(queue_entry)
 
 
     def _schedule_running_host_queue_entries(self):
@@ -1091,8 +1118,8 @@
                 self.add_agent_task(task)
 
 
-    def _run_queue_entry(self, queue_entry, host):
-        queue_entry.schedule_pre_job_tasks(assigned_host=host)
+    def _run_queue_entry(self, queue_entry):
+        queue_entry.schedule_pre_job_tasks()
 
 
     def _find_aborting(self):
@@ -2907,11 +2934,12 @@
 
     def set_host(self, host):
         if host:
+            logging.info('Assigning host %s to entry %s', host.hostname, self)
             self.queue_log_record('Assigning host ' + host.hostname)
             self.update_field('host_id', host.id)
-            self.update_field('active', True)
             self.block_host(host.id)
         else:
+            logging.info('Releasing host from %s', self)
             self.queue_log_record('Releasing host')
             self.unblock_host(self.host.id)
             self.update_field('host_id', None)
@@ -3039,15 +3067,7 @@
         email_manager.manager.send_email(self.job.email_list, subject, body)
 
 
-    def schedule_pre_job_tasks(self, assigned_host=None):
-        if self.meta_host is not None or self.atomic_group:
-            assert assigned_host
-            # ensure results dir exists for the queue log
-            if self.host_id is None:
-                self.set_host(assigned_host)
-            else:
-                assert assigned_host.id == self.host_id
-
+    def schedule_pre_job_tasks(self):
         logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",
                      self.job.name, self.meta_host, self.atomic_group_id,
                      self.job.id, self.id, self.host.hostname, self.status)
diff --git a/scheduler/monitor_db_functional_test.py b/scheduler/monitor_db_functional_test.py
index 6d0de07..687a2ef 100755
--- a/scheduler/monitor_db_functional_test.py
+++ b/scheduler/monitor_db_functional_test.py
@@ -968,7 +968,7 @@
         self._run_dispatcher() # delay task started waiting
 
         self.mock_drone_manager.finish_specific_process(
-                'hosts/host5/1-verify', monitor_db._AUTOSERV_PID_FILE)
+                'hosts/host6/1-verify', monitor_db._AUTOSERV_PID_FILE)
         self._run_dispatcher() # job starts now
         for entry in queue_entries:
             self._check_statuses(entry, HqeStatus.RUNNING, HostStatus.RUNNING)
diff --git a/scheduler/monitor_db_unittest.py b/scheduler/monitor_db_unittest.py
index ac8240b..0e1d66e 100755
--- a/scheduler/monitor_db_unittest.py
+++ b/scheduler/monitor_db_unittest.py
@@ -286,11 +286,16 @@
         self._jobs_scheduled = []
 
 
+    def _run_scheduler(self):
+        for _ in xrange(2): # metahost scheduling can take two cycles
+            self._dispatcher._schedule_new_jobs()
+
+
     def _test_basic_scheduling_helper(self, use_metahosts):
         'Basic nonmetahost scheduling'
         self._create_job_simple([1], use_metahosts)
         self._create_job_simple([2], use_metahosts)
-        self._dispatcher._schedule_new_jobs()
+        self._run_scheduler()
         self._assert_job_scheduled_on(1, 1)
         self._assert_job_scheduled_on(2, 2)
         self._check_for_extra_schedulings()
@@ -302,7 +307,7 @@
         self._create_job_simple([2], use_metahosts)
         self._create_job_simple([1,2], use_metahosts)
         self._create_job_simple([1], use_metahosts, priority=1)
-        self._dispatcher._schedule_new_jobs()
+        self._run_scheduler()
         self._assert_job_scheduled_on(4, 1) # higher priority
         self._assert_job_scheduled_on(2, 2) # earlier job over later
         self._check_for_extra_schedulings()
@@ -315,17 +320,17 @@
         """
         self._create_job_simple([1], use_metahosts)
         self._do_query('UPDATE afe_hosts SET status="Running" WHERE id=1')
-        self._dispatcher._schedule_new_jobs()
+        self._run_scheduler()
         self._check_for_extra_schedulings()
 
         self._do_query('UPDATE afe_hosts SET status="Ready", locked=1 '
                        'WHERE id=1')
-        self._dispatcher._schedule_new_jobs()
+        self._run_scheduler()
         self._check_for_extra_schedulings()
 
         self._do_query('UPDATE afe_hosts SET locked=0, invalid=1 '
                        'WHERE id=1')
-        self._dispatcher._schedule_new_jobs()
+        self._run_scheduler()
         if not use_metahosts:
             self._assert_job_scheduled_on(1, 1)
         self._check_for_extra_schedulings()
@@ -335,14 +340,14 @@
         'Only idle hosts get scheduled'
         self._create_job(hosts=[1], active=True)
         self._create_job_simple([1], use_metahosts)
-        self._dispatcher._schedule_new_jobs()
+        self._run_scheduler()
         self._check_for_extra_schedulings()
 
 
     def _test_obey_ACLs_helper(self, use_metahosts):
         self._do_query('DELETE FROM afe_acl_groups_hosts WHERE host_id=1')
         self._create_job_simple([1], use_metahosts)
-        self._dispatcher._schedule_new_jobs()
+        self._run_scheduler()
         self._check_for_extra_schedulings()
 
 
@@ -370,7 +375,7 @@
         self._do_query('DELETE FROM afe_acl_groups_hosts WHERE host_id=1')
         self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=1')
         self._create_job_simple([1])
-        self._dispatcher._schedule_new_jobs()
+        self._run_scheduler()
         self._assert_job_scheduled_on(1, 1)
         self._check_for_extra_schedulings()
 
@@ -416,14 +421,14 @@
     def test_only_if_needed_labels_avoids_host(self):
         job = self._setup_test_only_if_needed_labels()
         # if the job doesn't depend on label3, there should be no scheduling
-        self._dispatcher._schedule_new_jobs()
+        self._run_scheduler()
         self._check_for_extra_schedulings()
 
 
     def test_only_if_needed_labels_schedules(self):
         job = self._setup_test_only_if_needed_labels()
         job.dependency_labels.add(self.label3)
-        self._dispatcher._schedule_new_jobs()
+        self._run_scheduler()
         self._assert_job_scheduled_on(1, 1)
         self._check_for_extra_schedulings()
 
@@ -434,7 +439,7 @@
         # should also work if the metahost is the only_if_needed label
         self._do_query('DELETE FROM afe_jobs_dependency_labels')
         self._create_job(metahosts=[3])
-        self._dispatcher._schedule_new_jobs()
+        self._run_scheduler()
         self._assert_job_scheduled_on(2, 1)
         self._check_for_extra_schedulings()
 
@@ -446,7 +451,7 @@
         """
         self._create_job(metahosts=[1])
         self._create_job(hosts=[1])
-        self._dispatcher._schedule_new_jobs()
+        self._run_scheduler()
         self._assert_job_scheduled_on(2, 1)
         self._check_for_extra_schedulings()
 
@@ -460,7 +465,7 @@
         # make the nonmetahost entry complete, so the metahost can try
         # to get scheduled
         self._update_hqe(set='complete = 1', where='host_id=1')
-        self._dispatcher._schedule_new_jobs()
+        self._run_scheduler()
         self._check_for_extra_schedulings()
 
 
@@ -533,7 +538,7 @@
     def test_atomic_group_hosts_blocked_from_non_atomic_jobs(self):
         # Create a job scheduled to run on label6.
         self._create_job(metahosts=[self.label6.id])
-        self._dispatcher._schedule_new_jobs()
+        self._run_scheduler()
         # label6 only has hosts that are in atomic groups associated with it,
         # there should be no scheduling.
         self._check_for_extra_schedulings()
@@ -543,7 +548,7 @@
         # Create a job scheduled to run on label5.  This is an atomic group
         # label but this job does not request atomic group scheduling.
         self._create_job(metahosts=[self.label5.id])
-        self._dispatcher._schedule_new_jobs()
+        self._run_scheduler()
         # label6 only has hosts that are in atomic groups associated with it,
         # there should be no scheduling.
         self._check_for_extra_schedulings()
@@ -555,7 +560,7 @@
                          atomic_group=1)
         job_b = self._create_job(synchronous=True, metahosts=[self.label5.id],
                          atomic_group=1)
-        self._dispatcher._schedule_new_jobs()
+        self._run_scheduler()
         # atomic_group.max_number_of_machines was 2 so we should run on 2.
         self._assert_job_scheduled_on_number_of(job_a.id, (5, 6, 7), 2)
         self._assert_job_scheduled_on(job_b.id, 8)  # label5
@@ -570,14 +575,14 @@
         # atomic group as a set of machines to run smaller jobs within (a set
         # of hosts configured for use in network tests with eachother perhaps?)
         onehost_job = self._create_job(atomic_group=1)
-        self._dispatcher._schedule_new_jobs()
+        self._run_scheduler()
         self._assert_job_scheduled_on_number_of(onehost_job.id, (5, 6, 7), 1)
         self._check_for_extra_schedulings()
 
         # No more atomic groups have hosts available, no more jobs should
         # be scheduled.
         self._create_job(atomic_group=1)
-        self._dispatcher._schedule_new_jobs()
+        self._run_scheduler()
         self._check_for_extra_schedulings()
 
 
@@ -586,7 +591,7 @@
         self._do_query('DELETE FROM afe_acl_groups_hosts '
                        'WHERE host_id in (8,9)')
         job = self._create_job(metahosts=[self.label5.id], atomic_group=1)
-        self._dispatcher._schedule_new_jobs()
+        self._run_scheduler()
         self._check_for_extra_schedulings()
 
 
@@ -594,7 +599,7 @@
         # A dependency label that matches no hosts in the atomic group.
         job_a = self._create_job(atomic_group=1)
         job_a.dependency_labels.add(self.label3)
-        self._dispatcher._schedule_new_jobs()
+        self._run_scheduler()
         self._check_for_extra_schedulings()
 
 
@@ -603,7 +608,7 @@
         job_b = self._create_job(synchronous=True, metahosts=[self.label4.id],
                                  atomic_group=1)
         job_b.dependency_labels.add(self.label7)
-        self._dispatcher._schedule_new_jobs()
+        self._run_scheduler()
         self._check_for_extra_schedulings()
 
 
@@ -612,7 +617,7 @@
         # one of the two atomic group labels.
         job_c = self._create_job(synchronous=True, atomic_group=1)
         job_c.dependency_labels.add(self.label7)
-        self._dispatcher._schedule_new_jobs()
+        self._run_scheduler()
         self._assert_job_scheduled_on_number_of(job_c.id, (8, 9), 2)
         self._check_for_extra_schedulings()
 
@@ -622,7 +627,7 @@
         self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=9')
         # An atomic job without a metahost.
         job = self._create_job(synchronous=True, atomic_group=1)
-        self._dispatcher._schedule_new_jobs()
+        self._run_scheduler()
         self._assert_job_scheduled_on_number_of(job.id, (5, 6, 7), 2)
         self._check_for_extra_schedulings()
 
@@ -633,7 +638,7 @@
         self._do_query('UPDATE afe_hosts SET status="Repair Failed" WHERE id=5')
         job = self._create_job(synchronous=True, metahosts=[self.label4.id],
                          atomic_group=1)
-        self._dispatcher._schedule_new_jobs()
+        self._run_scheduler()
         # Verify that it was scheduled on the 2 ready hosts in that group.
         self._assert_job_scheduled_on(job.id, 6)
         self._assert_job_scheduled_on(job.id, 7)
@@ -649,7 +654,7 @@
         self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=9')
         # Nothing to schedule when no group label has enough (2) good hosts..
         self._create_job(atomic_group=1, synchronous=True)
-        self._dispatcher._schedule_new_jobs()
+        self._run_scheduler()
         # There are not enough hosts in either atomic group,
         # No more scheduling should occur.
         self._check_for_extra_schedulings()
@@ -657,7 +662,7 @@
         # Now create an atomic job that has a synch count of 1.  It should
         # schedule on exactly one of the hosts.
         onehost_job = self._create_job(atomic_group=1)
-        self._dispatcher._schedule_new_jobs()
+        self._run_scheduler()
         self._assert_job_scheduled_on_number_of(onehost_job.id, (7, 8), 1)
 
 
@@ -665,7 +670,7 @@
         self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id in (8,9)')
         self._create_job(synchronous=True, metahosts=[self.label5.id],
                          atomic_group=1)
-        self._dispatcher._schedule_new_jobs()
+        self._run_scheduler()
         # no hosts in the selected group and label are valid.  no schedulings.
         self._check_for_extra_schedulings()
 
@@ -673,12 +678,12 @@
     def test_atomic_group_scheduling_metahost_works(self):
         # Test that atomic group scheduling also obeys metahosts.
         self._create_job(metahosts=[0], atomic_group=1)
-        self._dispatcher._schedule_new_jobs()
+        self._run_scheduler()
         # There are no atomic group hosts that also have that metahost.
         self._check_for_extra_schedulings()
 
         job_b = self._create_job(metahosts=[self.label5.id], atomic_group=1)
-        self._dispatcher._schedule_new_jobs()
+        self._run_scheduler()
         self._assert_job_scheduled_on(job_b.id, 8)
         self._assert_job_scheduled_on(job_b.id, 9)
         self._check_for_extra_schedulings()
@@ -691,7 +696,7 @@
         models.IneligibleHostQueue.objects.create(job=job, host_id=5)
         models.IneligibleHostQueue.objects.create(job=job, host_id=6)
         models.IneligibleHostQueue.objects.create(job=job, host_id=7)
-        self._dispatcher._schedule_new_jobs()
+        self._run_scheduler()
         # No scheduling should occur as all desired hosts were ineligible.
         self._check_for_extra_schedulings()
 
@@ -703,7 +708,7 @@
         model_job.synch_count = 4
         model_job.save()
         job = monitor_db.Job(id=model_job.id)
-        self._dispatcher._schedule_new_jobs()
+        self._run_scheduler()
         self._check_for_extra_schedulings()
         queue_entries = job.get_host_queue_entries()
         self.assertEqual(1, len(queue_entries))
@@ -723,7 +728,7 @@
         self.label5.atomic_group = None
         self.label5.save()
 
-        self._dispatcher._schedule_new_jobs()
+        self._run_scheduler()
         self._check_for_extra_schedulings()
 
 
@@ -732,7 +737,7 @@
         # fail to avoid users inadvertently holding up the use of an
         # entire atomic group by using the machines individually.
         job = self._create_job(hosts=[5])
-        self._dispatcher._schedule_new_jobs()
+        self._run_scheduler()
         self._check_for_extra_schedulings()
 
 
@@ -741,14 +746,14 @@
         # work when the atomic group is listed on the HQE in addition
         # to the host (assuming the sync count is 1).
         job = self._create_job(hosts=[5], atomic_group=1)
-        self._dispatcher._schedule_new_jobs()
+        self._run_scheduler()
         self._assert_job_scheduled_on(job.id, 5)
         self._check_for_extra_schedulings()
 
 
     def test_schedule_directly_on_atomic_group_hosts_sync2(self):
         job = self._create_job(hosts=[5,8], atomic_group=1, synchronous=True)
-        self._dispatcher._schedule_new_jobs()
+        self._run_scheduler()
         self._assert_job_scheduled_on(job.id, 5)
         self._assert_job_scheduled_on(job.id, 8)
         self._check_for_extra_schedulings()
@@ -756,21 +761,21 @@
 
     def test_schedule_directly_on_atomic_group_hosts_wrong_group(self):
         job = self._create_job(hosts=[5,8], atomic_group=2, synchronous=True)
-        self._dispatcher._schedule_new_jobs()
+        self._run_scheduler()
         self._check_for_extra_schedulings()
 
 
     def test_only_schedule_queued_entries(self):
         self._create_job(metahosts=[1])
         self._update_hqe(set='active=1, host_id=2')
-        self._dispatcher._schedule_new_jobs()
+        self._run_scheduler()
         self._check_for_extra_schedulings()
 
 
     def test_no_ready_hosts(self):
         self._create_job(hosts=[1])
         self._do_query('UPDATE afe_hosts SET status="Repair Failed"')
-        self._dispatcher._schedule_new_jobs()
+        self._run_scheduler()
         self._check_for_extra_schedulings()