First iteration of pluggable metahost handlers. This change adds the basic framework and moves the default, label-based metahost assignment code into a handler. It includes some refactorings to the basic scheduling code to make things a bit cleaner.
Signed-off-by: Steve Howard <showard@google.com>
git-svn-id: http://test.kernel.org/svn/autotest/trunk@4232 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/scheduler/metahost_scheduler.py b/scheduler/metahost_scheduler.py
new file mode 100644
index 0000000..9588e95
--- /dev/null
+++ b/scheduler/metahost_scheduler.py
@@ -0,0 +1,108 @@
+from autotest_lib.client.common_lib import utils
+
+class HostSchedulingUtility(object):
+ """Interface to host availability information from the scheduler."""
+ def hosts_in_label(self, label_id):
+ """Return potentially usable hosts with the given label."""
+ raise NotImplementedError
+
+
+ def remove_host_from_label(self, host_id, label_id):
+ """Remove this host from the internal list of usable hosts in the label.
+
+ This is provided as an optimization -- when code gets a host from a
+ label and concludes it's unusable, it can call this to avoid getting
+ that host again in the future (within this tick). This function should
+ not affect correctness.
+ """
+ raise NotImplementedError
+
+
+ def pop_host(self, host_id):
+ """Remove and return a host from the internal list of available hosts.
+ """
+ raise NotImplementedError
+
+
+ def ineligible_hosts_for_entry(self, queue_entry):
+ """Get the list of hosts ineligible to run the given queue entry."""
+ raise NotImplementedError
+
+
+ def is_host_usable(self, host_id):
+ """Determine if the host is currently usable at all."""
+ raise NotImplementedError
+
+
+ def is_host_eligible_for_job(self, host_id, queue_entry):
+ """Determine if the host is eligible specifically for this queue entry.
+
+ @param queue_entry: a HostQueueEntry DBObject
+ """
+ raise NotImplementedError
+
+
+class MetahostScheduler(object):
+ def can_schedule_metahost(self, queue_entry):
+ """Return true if this object can schedule the given queue entry.
+
+ At most one MetahostScheduler should return true for any given entry.
+
+ @param queue_entry: a HostQueueEntry DBObject
+ """
+ raise NotImplementedError
+
+
+ def schedule_metahost(self, queue_entry, scheduling_utility):
+ """Schedule the given queue entry, if possible.
+
+ This method should make necessary database changes culminating in
+ assigning a host to the given queue entry in the database. It may
+ take no action if no host can be assigned currently.
+
+ @param queue_entry: a HostQueueEntry DBObject
+ @param scheduling_utility: a HostSchedulingUtility object
+ """
+ raise NotImplementedError
+
+
+ def recovery_on_startup(self):
+ """Perform any necessary recovery upon scheduler startup."""
+ pass
+
+
+ def tick(self):
+ """Called once per scheduler cycle; any actions are allowed."""
+ pass
+
+
+class LabelMetahostScheduler(MetahostScheduler):
+ def can_schedule_metahost(self, queue_entry):
+ return bool(queue_entry.meta_host)
+
+
+ def schedule_metahost(self, queue_entry, scheduling_utility):
+ label_id = queue_entry.meta_host
+ hosts_in_label = scheduling_utility.hosts_in_label(label_id)
+ ineligible_host_ids = scheduling_utility.ineligible_hosts_for_entry(
+ queue_entry)
+
+ for host_id in hosts_in_label:
+ if not scheduling_utility.is_host_usable(host_id):
+ scheduling_utility.remove_host_from_label(host_id, label_id)
+ continue
+ if host_id in ineligible_host_ids:
+ continue
+ if not scheduling_utility.is_host_eligible_for_job(host_id,
+ queue_entry):
+ continue
+
+ # Remove the host from our cached internal state before returning
+ scheduling_utility.remove_host_from_label(host_id, label_id)
+ host = scheduling_utility.pop_host(host_id)
+ queue_entry.set_host(host)
+ return
+
+
+def get_metahost_schedulers():
+ return [LabelMetahostScheduler()]
diff --git a/scheduler/metahost_scheduler_unittest.py b/scheduler/metahost_scheduler_unittest.py
new file mode 100644
index 0000000..5933769
--- /dev/null
+++ b/scheduler/metahost_scheduler_unittest.py
@@ -0,0 +1,74 @@
+#!/usr/bin/python
+
+import common
+import unittest
+from autotest_lib.client.common_lib.test_utils import mock
+from autotest_lib.scheduler import metahost_scheduler, monitor_db
+
+class LabelMetahostSchedulerTest(unittest.TestCase):
+ def setUp(self):
+ self.god = mock.mock_god()
+ self.scheduling_utility = self.god.create_mock_class(
+ metahost_scheduler.HostSchedulingUtility, 'utility')
+ self.metahost_scheduler = metahost_scheduler.LabelMetahostScheduler()
+
+
+ def entry(self):
+ return self.god.create_mock_class(monitor_db.HostQueueEntry, 'entry')
+
+
+ def test_can_schedule_metahost(self):
+ entry = self.entry()
+ entry.meta_host = None
+ self.assertFalse(self.metahost_scheduler.can_schedule_metahost(entry))
+
+ entry.meta_host = 1
+ self.assert_(self.metahost_scheduler.can_schedule_metahost(entry))
+
+
+ def test_schedule_metahost(self):
+ entry = self.entry()
+ entry.meta_host = 1
+ host = object()
+
+ self.scheduling_utility.hosts_in_label.expect_call(1).and_return(
+ [2, 3, 4, 5])
+ # 2 is in ineligible_hosts
+ (self.scheduling_utility.ineligible_hosts_for_entry.expect_call(entry)
+ .and_return([2]))
+ self.scheduling_utility.is_host_usable.expect_call(2).and_return(True)
+ # 3 is unusable
+ self.scheduling_utility.is_host_usable.expect_call(3).and_return(False)
+ self.scheduling_utility.remove_host_from_label.expect_call(3, 1)
+ # 4 is ineligible for the job
+ self.scheduling_utility.is_host_usable.expect_call(4).and_return(True)
+ (self.scheduling_utility.is_host_eligible_for_job.expect_call(4, entry)
+ .and_return(False))
+ # 5 runs
+ self.scheduling_utility.is_host_usable.expect_call(5).and_return(True)
+ (self.scheduling_utility.is_host_eligible_for_job.expect_call(5, entry)
+ .and_return(True))
+ self.scheduling_utility.remove_host_from_label.expect_call(5, 1)
+ self.scheduling_utility.pop_host.expect_call(5).and_return(host)
+ entry.set_host.expect_call(host)
+
+ self.metahost_scheduler.schedule_metahost(entry,
+ self.scheduling_utility)
+ self.god.check_playback()
+
+
+ def test_no_hosts(self):
+ entry = self.entry()
+ entry.meta_host = 1
+
+ self.scheduling_utility.hosts_in_label.expect_call(1).and_return(())
+ (self.scheduling_utility.ineligible_hosts_for_entry.expect_call(entry)
+ .and_return(()))
+
+ self.metahost_scheduler.schedule_metahost(entry,
+ self.scheduling_utility)
+ self.god.check_playback()
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index eb21a56..464ad5e 100755
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -24,7 +24,7 @@
from autotest_lib.scheduler import drone_manager, drones, email_manager
from autotest_lib.scheduler import monitor_db_cleanup
from autotest_lib.scheduler import status_server, scheduler_config
-from autotest_lib.scheduler import gc_stats
+from autotest_lib.scheduler import gc_stats, metahost_scheduler
BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
PID_FILE_PREFIX = 'monitor_db'
@@ -78,6 +78,11 @@
return {}
+get_metahost_schedulers = utils.import_site_function(
+ __file__, 'autotest_lib.scheduler.site_metahost_scheduler',
+ 'get_metahost_schedulers', metahost_scheduler.get_metahost_schedulers)
+
+
def main():
try:
try:
@@ -260,7 +265,22 @@
"""Raised by HostScheduler when an inconsistent state occurs."""
-class HostScheduler(object):
+class HostScheduler(metahost_scheduler.HostSchedulingUtility):
+ """Handles the logic for choosing when to run jobs and on which hosts.
+
+ This class makes several queries to the database on each tick, building up
+ some auxiliary data structures and using them to determine which hosts are
+ eligible to run which jobs, taking into account all the various factors that
+ affect that.
+
+ In the past this was done with one or two very large, complex database
+ queries. It has proven much simpler and faster to build these auxiliary
+ data structures and perform the logic in Python.
+ """
+ def __init__(self):
+ self._metahost_schedulers = get_metahost_schedulers()
+
+
def _get_ready_hosts(self):
# avoid any host with a currently active queue entry against it
hosts = Host.fetch(
@@ -378,6 +398,32 @@
self._labels = self._get_labels()
+ def hosts_in_label(self, label_id):
+ """Return potentially usable hosts with the given label."""
+ return set(self._label_hosts.get(label_id, ()))
+
+
+ def remove_host_from_label(self, host_id, label_id):
+ """Remove this host from the internal list of usable hosts in the label.
+
+ This is provided as an optimization -- when code gets a host from a
+ label and concludes it's unusable, it can call this to avoid getting
+ that host again in the future (within this tick).
+ """
+ self._label_hosts[label_id].remove(host_id)
+
+
+ def pop_host(self, host_id):
+ """Remove and return a host from the internal list of available hosts.
+ """
+ return self._hosts_available.pop(host_id)
+
+
+ def ineligible_hosts_for_entry(self, queue_entry):
+ """Get the list of hosts ineligible to run the given queue entry."""
+ return set(self._ineligible_hosts.get(queue_entry.job_id, ()))
+
+
def _is_acl_accessible(self, host_id, queue_entry):
job_acls = self._job_acls.get(queue_entry.job_id, set())
host_acls = self._host_acls.get(host_id, set())
@@ -470,11 +516,11 @@
supplied queue_entry.
"""
return set(host_id for host_id in group_hosts
- if self._is_host_usable(host_id)
- and self._is_host_eligible_for_job(host_id, queue_entry))
+ if self.is_host_usable(host_id)
+ and self.is_host_eligible_for_job(host_id, queue_entry))
- def _is_host_eligible_for_job(self, host_id, queue_entry):
+ def is_host_eligible_for_job(self, host_id, queue_entry):
if self._is_host_invalid(host_id):
# if an invalid host is scheduled for a job, it's a one-time host
# and it therefore bypasses eligibility checks. note this can only
@@ -498,12 +544,12 @@
def _schedule_non_metahost(self, queue_entry):
- if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
+ if not self.is_host_eligible_for_job(queue_entry.host_id, queue_entry):
return None
return self._hosts_available.pop(queue_entry.host_id, None)
- def _is_host_usable(self, host_id):
+ def is_host_usable(self, host_id):
if host_id not in self._hosts_available:
# host was already used during this scheduling cycle
return False
@@ -514,35 +560,16 @@
return True
- def _schedule_metahost(self, queue_entry):
- label_id = queue_entry.meta_host
- hosts_in_label = self._label_hosts.get(label_id, set())
- ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
- set())
-
- # must iterate over a copy so we can mutate the original while iterating
- for host_id in list(hosts_in_label):
- if not self._is_host_usable(host_id):
- hosts_in_label.remove(host_id)
- continue
- if host_id in ineligible_host_ids:
- continue
- if not self._is_host_eligible_for_job(host_id, queue_entry):
- continue
-
- # Remove the host from our cached internal state before returning
- # the host object.
- hosts_in_label.remove(host_id)
- return self._hosts_available.pop(host_id)
- return None
-
-
- def find_eligible_host(self, queue_entry):
- if not queue_entry.meta_host:
- assert queue_entry.host_id is not None
+ def schedule_entry(self, queue_entry):
+ if queue_entry.host_id is not None:
return self._schedule_non_metahost(queue_entry)
- assert queue_entry.atomic_group_id is None
- return self._schedule_metahost(queue_entry)
+
+ for scheduler in self._metahost_schedulers:
+ if scheduler.can_schedule_metahost(queue_entry):
+ scheduler.schedule_metahost(queue_entry, self)
+ return None
+
+ raise SchedulerError('No metahost scheduler to handle %s' % queue_entry)
def find_eligible_atomic_group(self, queue_entry):
@@ -580,14 +607,13 @@
job.id, job.synch_count, atomic_group.id,
atomic_group.max_number_of_machines, queue_entry.id)
return []
- hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
- ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
- set())
+ hosts_in_label = self.hosts_in_label(queue_entry.meta_host)
+ ineligible_host_ids = self.ineligible_hosts_for_entry(queue_entry)
# Look in each label associated with atomic_group until we find one with
# enough hosts to satisfy the job.
for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
- group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
+ group_hosts = set(self.hosts_in_label(atomic_label_id))
if queue_entry.meta_host is not None:
# If we have a metahost label, only allow its hosts.
group_hosts.intersection_update(hosts_in_label)
@@ -1041,17 +1067,17 @@
logging.info('Expanding atomic group entry %s with hosts %s',
queue_entry,
', '.join(host.hostname for host in group_hosts))
- # The first assigned host uses the original HostQueueEntry
- group_queue_entries = [queue_entry]
+
for assigned_host in group_hosts[1:]:
# Create a new HQE for every additional assigned_host.
new_hqe = HostQueueEntry.clone(queue_entry)
new_hqe.save()
- group_queue_entries.append(new_hqe)
- assert len(group_queue_entries) == len(group_hosts)
- for queue_entry, host in itertools.izip(group_queue_entries,
- group_hosts):
- self._run_queue_entry(queue_entry, host)
+ new_hqe.set_host(assigned_host)
+ self._run_queue_entry(new_hqe)
+
+ # The first assigned host uses the original HostQueueEntry
+ queue_entry.set_host(group_hosts[0])
+ self._run_queue_entry(queue_entry)
def _schedule_hostless_job(self, queue_entry):
@@ -1067,15 +1093,16 @@
is_unassigned_atomic_group = (
queue_entry.atomic_group_id is not None
and queue_entry.host_id is None)
- if is_unassigned_atomic_group:
- self._schedule_atomic_group(queue_entry)
- elif queue_entry.is_hostless():
+
+ if queue_entry.is_hostless():
self._schedule_hostless_job(queue_entry)
+ elif is_unassigned_atomic_group:
+ self._schedule_atomic_group(queue_entry)
else:
- assigned_host = self._host_scheduler.find_eligible_host(
- queue_entry)
+ assigned_host = self._host_scheduler.schedule_entry(queue_entry)
if assigned_host and not self.host_has_agent(assigned_host):
- self._run_queue_entry(queue_entry, assigned_host)
+ assert assigned_host.id == queue_entry.host_id
+ self._run_queue_entry(queue_entry)
def _schedule_running_host_queue_entries(self):
@@ -1091,8 +1118,8 @@
self.add_agent_task(task)
- def _run_queue_entry(self, queue_entry, host):
- queue_entry.schedule_pre_job_tasks(assigned_host=host)
+ def _run_queue_entry(self, queue_entry):
+ queue_entry.schedule_pre_job_tasks()
def _find_aborting(self):
@@ -2907,11 +2934,12 @@
def set_host(self, host):
if host:
+ logging.info('Assigning host %s to entry %s', host.hostname, self)
self.queue_log_record('Assigning host ' + host.hostname)
self.update_field('host_id', host.id)
- self.update_field('active', True)
self.block_host(host.id)
else:
+ logging.info('Releasing host from %s', self)
self.queue_log_record('Releasing host')
self.unblock_host(self.host.id)
self.update_field('host_id', None)
@@ -3039,15 +3067,7 @@
email_manager.manager.send_email(self.job.email_list, subject, body)
- def schedule_pre_job_tasks(self, assigned_host=None):
- if self.meta_host is not None or self.atomic_group:
- assert assigned_host
- # ensure results dir exists for the queue log
- if self.host_id is None:
- self.set_host(assigned_host)
- else:
- assert assigned_host.id == self.host_id
-
+ def schedule_pre_job_tasks(self):
logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",
self.job.name, self.meta_host, self.atomic_group_id,
self.job.id, self.id, self.host.hostname, self.status)
diff --git a/scheduler/monitor_db_functional_test.py b/scheduler/monitor_db_functional_test.py
index 6d0de07..687a2ef 100755
--- a/scheduler/monitor_db_functional_test.py
+++ b/scheduler/monitor_db_functional_test.py
@@ -968,7 +968,7 @@
self._run_dispatcher() # delay task started waiting
self.mock_drone_manager.finish_specific_process(
- 'hosts/host5/1-verify', monitor_db._AUTOSERV_PID_FILE)
+ 'hosts/host6/1-verify', monitor_db._AUTOSERV_PID_FILE)
self._run_dispatcher() # job starts now
for entry in queue_entries:
self._check_statuses(entry, HqeStatus.RUNNING, HostStatus.RUNNING)
diff --git a/scheduler/monitor_db_unittest.py b/scheduler/monitor_db_unittest.py
index ac8240b..0e1d66e 100755
--- a/scheduler/monitor_db_unittest.py
+++ b/scheduler/monitor_db_unittest.py
@@ -286,11 +286,16 @@
self._jobs_scheduled = []
+ def _run_scheduler(self):
+ for _ in xrange(2): # metahost scheduling can take two cycles
+ self._dispatcher._schedule_new_jobs()
+
+
def _test_basic_scheduling_helper(self, use_metahosts):
'Basic nonmetahost scheduling'
self._create_job_simple([1], use_metahosts)
self._create_job_simple([2], use_metahosts)
- self._dispatcher._schedule_new_jobs()
+ self._run_scheduler()
self._assert_job_scheduled_on(1, 1)
self._assert_job_scheduled_on(2, 2)
self._check_for_extra_schedulings()
@@ -302,7 +307,7 @@
self._create_job_simple([2], use_metahosts)
self._create_job_simple([1,2], use_metahosts)
self._create_job_simple([1], use_metahosts, priority=1)
- self._dispatcher._schedule_new_jobs()
+ self._run_scheduler()
self._assert_job_scheduled_on(4, 1) # higher priority
self._assert_job_scheduled_on(2, 2) # earlier job over later
self._check_for_extra_schedulings()
@@ -315,17 +320,17 @@
"""
self._create_job_simple([1], use_metahosts)
self._do_query('UPDATE afe_hosts SET status="Running" WHERE id=1')
- self._dispatcher._schedule_new_jobs()
+ self._run_scheduler()
self._check_for_extra_schedulings()
self._do_query('UPDATE afe_hosts SET status="Ready", locked=1 '
'WHERE id=1')
- self._dispatcher._schedule_new_jobs()
+ self._run_scheduler()
self._check_for_extra_schedulings()
self._do_query('UPDATE afe_hosts SET locked=0, invalid=1 '
'WHERE id=1')
- self._dispatcher._schedule_new_jobs()
+ self._run_scheduler()
if not use_metahosts:
self._assert_job_scheduled_on(1, 1)
self._check_for_extra_schedulings()
@@ -335,14 +340,14 @@
'Only idle hosts get scheduled'
self._create_job(hosts=[1], active=True)
self._create_job_simple([1], use_metahosts)
- self._dispatcher._schedule_new_jobs()
+ self._run_scheduler()
self._check_for_extra_schedulings()
def _test_obey_ACLs_helper(self, use_metahosts):
self._do_query('DELETE FROM afe_acl_groups_hosts WHERE host_id=1')
self._create_job_simple([1], use_metahosts)
- self._dispatcher._schedule_new_jobs()
+ self._run_scheduler()
self._check_for_extra_schedulings()
@@ -370,7 +375,7 @@
self._do_query('DELETE FROM afe_acl_groups_hosts WHERE host_id=1')
self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=1')
self._create_job_simple([1])
- self._dispatcher._schedule_new_jobs()
+ self._run_scheduler()
self._assert_job_scheduled_on(1, 1)
self._check_for_extra_schedulings()
@@ -416,14 +421,14 @@
def test_only_if_needed_labels_avoids_host(self):
job = self._setup_test_only_if_needed_labels()
# if the job doesn't depend on label3, there should be no scheduling
- self._dispatcher._schedule_new_jobs()
+ self._run_scheduler()
self._check_for_extra_schedulings()
def test_only_if_needed_labels_schedules(self):
job = self._setup_test_only_if_needed_labels()
job.dependency_labels.add(self.label3)
- self._dispatcher._schedule_new_jobs()
+ self._run_scheduler()
self._assert_job_scheduled_on(1, 1)
self._check_for_extra_schedulings()
@@ -434,7 +439,7 @@
# should also work if the metahost is the only_if_needed label
self._do_query('DELETE FROM afe_jobs_dependency_labels')
self._create_job(metahosts=[3])
- self._dispatcher._schedule_new_jobs()
+ self._run_scheduler()
self._assert_job_scheduled_on(2, 1)
self._check_for_extra_schedulings()
@@ -446,7 +451,7 @@
"""
self._create_job(metahosts=[1])
self._create_job(hosts=[1])
- self._dispatcher._schedule_new_jobs()
+ self._run_scheduler()
self._assert_job_scheduled_on(2, 1)
self._check_for_extra_schedulings()
@@ -460,7 +465,7 @@
# make the nonmetahost entry complete, so the metahost can try
# to get scheduled
self._update_hqe(set='complete = 1', where='host_id=1')
- self._dispatcher._schedule_new_jobs()
+ self._run_scheduler()
self._check_for_extra_schedulings()
@@ -533,7 +538,7 @@
def test_atomic_group_hosts_blocked_from_non_atomic_jobs(self):
# Create a job scheduled to run on label6.
self._create_job(metahosts=[self.label6.id])
- self._dispatcher._schedule_new_jobs()
+ self._run_scheduler()
# label6 only has hosts that are in atomic groups associated with it,
# there should be no scheduling.
self._check_for_extra_schedulings()
@@ -543,7 +548,7 @@
# Create a job scheduled to run on label5. This is an atomic group
# label but this job does not request atomic group scheduling.
self._create_job(metahosts=[self.label5.id])
- self._dispatcher._schedule_new_jobs()
+ self._run_scheduler()
# label6 only has hosts that are in atomic groups associated with it,
# there should be no scheduling.
self._check_for_extra_schedulings()
@@ -555,7 +560,7 @@
atomic_group=1)
job_b = self._create_job(synchronous=True, metahosts=[self.label5.id],
atomic_group=1)
- self._dispatcher._schedule_new_jobs()
+ self._run_scheduler()
# atomic_group.max_number_of_machines was 2 so we should run on 2.
self._assert_job_scheduled_on_number_of(job_a.id, (5, 6, 7), 2)
self._assert_job_scheduled_on(job_b.id, 8) # label5
@@ -570,14 +575,14 @@
# atomic group as a set of machines to run smaller jobs within (a set
# of hosts configured for use in network tests with eachother perhaps?)
onehost_job = self._create_job(atomic_group=1)
- self._dispatcher._schedule_new_jobs()
+ self._run_scheduler()
self._assert_job_scheduled_on_number_of(onehost_job.id, (5, 6, 7), 1)
self._check_for_extra_schedulings()
# No more atomic groups have hosts available, no more jobs should
# be scheduled.
self._create_job(atomic_group=1)
- self._dispatcher._schedule_new_jobs()
+ self._run_scheduler()
self._check_for_extra_schedulings()
@@ -586,7 +591,7 @@
self._do_query('DELETE FROM afe_acl_groups_hosts '
'WHERE host_id in (8,9)')
job = self._create_job(metahosts=[self.label5.id], atomic_group=1)
- self._dispatcher._schedule_new_jobs()
+ self._run_scheduler()
self._check_for_extra_schedulings()
@@ -594,7 +599,7 @@
# A dependency label that matches no hosts in the atomic group.
job_a = self._create_job(atomic_group=1)
job_a.dependency_labels.add(self.label3)
- self._dispatcher._schedule_new_jobs()
+ self._run_scheduler()
self._check_for_extra_schedulings()
@@ -603,7 +608,7 @@
job_b = self._create_job(synchronous=True, metahosts=[self.label4.id],
atomic_group=1)
job_b.dependency_labels.add(self.label7)
- self._dispatcher._schedule_new_jobs()
+ self._run_scheduler()
self._check_for_extra_schedulings()
@@ -612,7 +617,7 @@
# one of the two atomic group labels.
job_c = self._create_job(synchronous=True, atomic_group=1)
job_c.dependency_labels.add(self.label7)
- self._dispatcher._schedule_new_jobs()
+ self._run_scheduler()
self._assert_job_scheduled_on_number_of(job_c.id, (8, 9), 2)
self._check_for_extra_schedulings()
@@ -622,7 +627,7 @@
self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=9')
# An atomic job without a metahost.
job = self._create_job(synchronous=True, atomic_group=1)
- self._dispatcher._schedule_new_jobs()
+ self._run_scheduler()
self._assert_job_scheduled_on_number_of(job.id, (5, 6, 7), 2)
self._check_for_extra_schedulings()
@@ -633,7 +638,7 @@
self._do_query('UPDATE afe_hosts SET status="Repair Failed" WHERE id=5')
job = self._create_job(synchronous=True, metahosts=[self.label4.id],
atomic_group=1)
- self._dispatcher._schedule_new_jobs()
+ self._run_scheduler()
# Verify that it was scheduled on the 2 ready hosts in that group.
self._assert_job_scheduled_on(job.id, 6)
self._assert_job_scheduled_on(job.id, 7)
@@ -649,7 +654,7 @@
self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=9')
# Nothing to schedule when no group label has enough (2) good hosts..
self._create_job(atomic_group=1, synchronous=True)
- self._dispatcher._schedule_new_jobs()
+ self._run_scheduler()
# There are not enough hosts in either atomic group,
# No more scheduling should occur.
self._check_for_extra_schedulings()
@@ -657,7 +662,7 @@
# Now create an atomic job that has a synch count of 1. It should
# schedule on exactly one of the hosts.
onehost_job = self._create_job(atomic_group=1)
- self._dispatcher._schedule_new_jobs()
+ self._run_scheduler()
self._assert_job_scheduled_on_number_of(onehost_job.id, (7, 8), 1)
@@ -665,7 +670,7 @@
self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id in (8,9)')
self._create_job(synchronous=True, metahosts=[self.label5.id],
atomic_group=1)
- self._dispatcher._schedule_new_jobs()
+ self._run_scheduler()
# no hosts in the selected group and label are valid. no schedulings.
self._check_for_extra_schedulings()
@@ -673,12 +678,12 @@
def test_atomic_group_scheduling_metahost_works(self):
# Test that atomic group scheduling also obeys metahosts.
self._create_job(metahosts=[0], atomic_group=1)
- self._dispatcher._schedule_new_jobs()
+ self._run_scheduler()
# There are no atomic group hosts that also have that metahost.
self._check_for_extra_schedulings()
job_b = self._create_job(metahosts=[self.label5.id], atomic_group=1)
- self._dispatcher._schedule_new_jobs()
+ self._run_scheduler()
self._assert_job_scheduled_on(job_b.id, 8)
self._assert_job_scheduled_on(job_b.id, 9)
self._check_for_extra_schedulings()
@@ -691,7 +696,7 @@
models.IneligibleHostQueue.objects.create(job=job, host_id=5)
models.IneligibleHostQueue.objects.create(job=job, host_id=6)
models.IneligibleHostQueue.objects.create(job=job, host_id=7)
- self._dispatcher._schedule_new_jobs()
+ self._run_scheduler()
# No scheduling should occur as all desired hosts were ineligible.
self._check_for_extra_schedulings()
@@ -703,7 +708,7 @@
model_job.synch_count = 4
model_job.save()
job = monitor_db.Job(id=model_job.id)
- self._dispatcher._schedule_new_jobs()
+ self._run_scheduler()
self._check_for_extra_schedulings()
queue_entries = job.get_host_queue_entries()
self.assertEqual(1, len(queue_entries))
@@ -723,7 +728,7 @@
self.label5.atomic_group = None
self.label5.save()
- self._dispatcher._schedule_new_jobs()
+ self._run_scheduler()
self._check_for_extra_schedulings()
@@ -732,7 +737,7 @@
# fail to avoid users inadvertently holding up the use of an
# entire atomic group by using the machines individually.
job = self._create_job(hosts=[5])
- self._dispatcher._schedule_new_jobs()
+ self._run_scheduler()
self._check_for_extra_schedulings()
@@ -741,14 +746,14 @@
# work when the atomic group is listed on the HQE in addition
# to the host (assuming the sync count is 1).
job = self._create_job(hosts=[5], atomic_group=1)
- self._dispatcher._schedule_new_jobs()
+ self._run_scheduler()
self._assert_job_scheduled_on(job.id, 5)
self._check_for_extra_schedulings()
def test_schedule_directly_on_atomic_group_hosts_sync2(self):
job = self._create_job(hosts=[5,8], atomic_group=1, synchronous=True)
- self._dispatcher._schedule_new_jobs()
+ self._run_scheduler()
self._assert_job_scheduled_on(job.id, 5)
self._assert_job_scheduled_on(job.id, 8)
self._check_for_extra_schedulings()
@@ -756,21 +761,21 @@
def test_schedule_directly_on_atomic_group_hosts_wrong_group(self):
job = self._create_job(hosts=[5,8], atomic_group=2, synchronous=True)
- self._dispatcher._schedule_new_jobs()
+ self._run_scheduler()
self._check_for_extra_schedulings()
def test_only_schedule_queued_entries(self):
self._create_job(metahosts=[1])
self._update_hqe(set='active=1, host_id=2')
- self._dispatcher._schedule_new_jobs()
+ self._run_scheduler()
self._check_for_extra_schedulings()
def test_no_ready_hosts(self):
self._create_job(hosts=[1])
self._do_query('UPDATE afe_hosts SET status="Repair Failed"')
- self._dispatcher._schedule_new_jobs()
+ self._run_scheduler()
self._check_for_extra_schedulings()