[autotest] RDB refactor I
Initial refactor for the rdb, implementes 1 in this schematic:
https://x20web.corp.google.com/~beeps/rdb_v1_midway.jpg
Also achieves the following:
- Don't process an hqe more than once, after having assigned a host to it.
- Don't assign a host to a queued, aborted hqe.
- Drop the metahost concept.
- Stop using labelmetahostscheduler to find hosts for non-metahost jobs.
- Include a database migration script for jobs that were still queued during
the scheduler restart, since they will now need a meta_host dependency.
This cl also doesn't support the schedulers ability to:
- Schedule an atomic group
* Consequently, also the ability to block a host even when the hqe using it is
no longer active.
- Schedule a metahost differently from a non-metahost
* Both metahosts and non-metahosts are just labels now
* Jobs which are already assigned hosts are still give precedence, though
- Schedule based on only_if_needed.
And fixes the unittests appropriately.
TEST=Ran suites, unittests. Restarted scheduler after applying these changes
and tested migration. Ran suite scheduler.
BUG=chromium:314082,chromium:314219,chromium:313680,chromium:315824,chromium:312333
DEPLOY=scheduler, migrate
Change-Id: I70c3c3c740e51581db88fe3ce5879c53d6e6511e
Reviewed-on: https://chromium-review.googlesource.com/175957
Reviewed-by: Alex Miller <milleral@chromium.org>
Commit-Queue: Prashanth B <beeps@chromium.org>
Tested-by: Prashanth B <beeps@chromium.org>
diff --git a/frontend/afe/frontend_test_utils.py b/frontend/afe/frontend_test_utils.py
index 158f1ac..6f53af3 100644
--- a/frontend/afe/frontend_test_utils.py
+++ b/frontend/afe/frontend_test_utils.py
@@ -121,6 +121,12 @@
reboot_before=model_attributes.RebootBefore.NEVER,
drone_set=drone_set, control_file=control_file,
parameterized_job=parameterized_job)
+
+ # Update the job's dependencies to include the metahost.
+ for metahost_label in metahosts:
+ dep = models.Label.objects.get(id=metahost_label)
+ job.dependency_labels.add(dep)
+
for host_id in hosts:
models.HostQueueEntry.objects.create(job=job, host_id=host_id,
status=status,
diff --git a/frontend/migrations/084_convert_metahost_to_label.py b/frontend/migrations/084_convert_metahost_to_label.py
new file mode 100644
index 0000000..9fb154c
--- /dev/null
+++ b/frontend/migrations/084_convert_metahost_to_label.py
@@ -0,0 +1,8 @@
+UP_SQL = """
+INSERT INTO afe_jobs_dependency_labels (job_id, label_id)
+SELECT job_id, meta_host FROM afe_host_queue_entries
+WHERE NOT complete AND NOT active AND status="Queued" AND NOT aborted;
+"""
+
+DOWN_SQL="""
+"""
diff --git a/scheduler/atomic_group_unittests.py b/scheduler/atomic_group_unittests.py
new file mode 100644
index 0000000..5a9e9c9
--- /dev/null
+++ b/scheduler/atomic_group_unittests.py
@@ -0,0 +1,329 @@
+#!/usr/bin/python
+#pylint: disable-msg=C0111
+
+import gc, time
+import common
+from autotest_lib.frontend import setup_django_environment
+from autotest_lib.frontend.afe import frontend_test_utils
+from autotest_lib.client.common_lib.test_utils import mock
+from autotest_lib.client.common_lib.test_utils import unittest
+from autotest_lib.database import database_connection
+from autotest_lib.frontend.afe import models
+from autotest_lib.scheduler import agent_task
+from autotest_lib.scheduler import monitor_db, drone_manager, email_manager
+from autotest_lib.scheduler import pidfile_monitor
+from autotest_lib.scheduler import scheduler_config, gc_stats, host_scheduler
+from autotest_lib.scheduler import monitor_db_functional_test
+from autotest_lib.scheduler import monitor_db_unittest
+from autotest_lib.scheduler import scheduler_models
+
+_DEBUG = False
+
+
+class AtomicGroupTest(monitor_db_unittest.DispatcherSchedulingTest):
+
+ def test_atomic_group_hosts_blocked_from_non_atomic_jobs(self):
+ # Create a job scheduled to run on label6.
+ self._create_job(metahosts=[self.label6.id])
+ self._run_scheduler()
+ # label6 only has hosts that are in atomic groups associated with it,
+ # there should be no scheduling.
+ self._check_for_extra_schedulings()
+
+
+ def test_atomic_group_hosts_blocked_from_non_atomic_jobs_explicit(self):
+ # Create a job scheduled to run on label5. This is an atomic group
+ # label but this job does not request atomic group scheduling.
+ self._create_job(metahosts=[self.label5.id])
+ self._run_scheduler()
+ # label6 only has hosts that are in atomic groups associated with it,
+ # there should be no scheduling.
+ self._check_for_extra_schedulings()
+
+
+ def test_atomic_group_scheduling_basics(self):
+ # Create jobs scheduled to run on an atomic group.
+ job_a = self._create_job(synchronous=True, metahosts=[self.label4.id],
+ atomic_group=1)
+ job_b = self._create_job(synchronous=True, metahosts=[self.label5.id],
+ atomic_group=1)
+ self._run_scheduler()
+ # atomic_group.max_number_of_machines was 2 so we should run on 2.
+ self._assert_job_scheduled_on_number_of(job_a.id, (5, 6, 7), 2)
+ self._assert_job_scheduled_on(job_b.id, 8) # label5
+ self._assert_job_scheduled_on(job_b.id, 9) # label5
+ self._check_for_extra_schedulings()
+
+ # The three host label4 atomic group still has one host available.
+ # That means a job with a synch_count of 1 asking to be scheduled on
+ # the atomic group can still use the final machine.
+ #
+ # This may seem like a somewhat odd use case. It allows the use of an
+ # atomic group as a set of machines to run smaller jobs within (a set
+ # of hosts configured for use in network tests with eachother perhaps?)
+ onehost_job = self._create_job(atomic_group=1)
+ self._run_scheduler()
+ self._assert_job_scheduled_on_number_of(onehost_job.id, (5, 6, 7), 1)
+ self._check_for_extra_schedulings()
+
+ # No more atomic groups have hosts available, no more jobs should
+ # be scheduled.
+ self._create_job(atomic_group=1)
+ self._run_scheduler()
+ self._check_for_extra_schedulings()
+
+
+ def test_atomic_group_scheduling_obeys_acls(self):
+ # Request scheduling on a specific atomic label but be denied by ACLs.
+ self._do_query('DELETE FROM afe_acl_groups_hosts '
+ 'WHERE host_id in (8,9)')
+ job = self._create_job(metahosts=[self.label5.id], atomic_group=1)
+ self._run_scheduler()
+ self._check_for_extra_schedulings()
+
+
+ def test_atomic_group_scheduling_dependency_label_exclude(self):
+ # A dependency label that matches no hosts in the atomic group.
+ job_a = self._create_job(atomic_group=1)
+ job_a.dependency_labels.add(self.label3)
+ self._run_scheduler()
+ self._check_for_extra_schedulings()
+
+
+ def test_atomic_group_scheduling_metahost_dependency_label_exclude(self):
+ # A metahost and dependency label that excludes too many hosts.
+ job_b = self._create_job(synchronous=True, metahosts=[self.label4.id],
+ atomic_group=1)
+ job_b.dependency_labels.add(self.label7)
+ self._run_scheduler()
+ self._check_for_extra_schedulings()
+
+
+ def test_atomic_group_scheduling_dependency_label_match(self):
+ # A dependency label that exists on enough atomic group hosts in only
+ # one of the two atomic group labels.
+ job_c = self._create_job(synchronous=True, atomic_group=1)
+ job_c.dependency_labels.add(self.label7)
+ self._run_scheduler()
+ self._assert_job_scheduled_on_number_of(job_c.id, (8, 9), 2)
+ self._check_for_extra_schedulings()
+
+
+ def test_atomic_group_scheduling_no_metahost(self):
+ # Force it to schedule on the other group for a reliable test.
+ self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=9')
+ # An atomic job without a metahost.
+ job = self._create_job(synchronous=True, atomic_group=1)
+ self._run_scheduler()
+ self._assert_job_scheduled_on_number_of(job.id, (5, 6, 7), 2)
+ self._check_for_extra_schedulings()
+
+
+ def test_atomic_group_scheduling_partial_group(self):
+ # Make one host in labels[3] unavailable so that there are only two
+ # hosts left in the group.
+ self._do_query('UPDATE afe_hosts SET status="Repair Failed" WHERE id=5')
+ job = self._create_job(synchronous=True, metahosts=[self.label4.id],
+ atomic_group=1)
+ self._run_scheduler()
+ # Verify that it was scheduled on the 2 ready hosts in that group.
+ self._assert_job_scheduled_on(job.id, 6)
+ self._assert_job_scheduled_on(job.id, 7)
+ self._check_for_extra_schedulings()
+
+
+ def test_atomic_group_scheduling_not_enough_available(self):
+ # Mark some hosts in each atomic group label as not usable.
+ # One host running, another invalid in the first group label.
+ self._do_query('UPDATE afe_hosts SET status="Running" WHERE id=5')
+ self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=6')
+ # One host invalid in the second group label.
+ self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=9')
+ # Nothing to schedule when no group label has enough (2) good hosts..
+ self._create_job(atomic_group=1, synchronous=True)
+ self._run_scheduler()
+ # There are not enough hosts in either atomic group,
+ # No more scheduling should occur.
+ self._check_for_extra_schedulings()
+
+ # Now create an atomic job that has a synch count of 1. It should
+ # schedule on exactly one of the hosts.
+ onehost_job = self._create_job(atomic_group=1)
+ self._run_scheduler()
+ self._assert_job_scheduled_on_number_of(onehost_job.id, (7, 8), 1)
+
+
+ def test_atomic_group_scheduling_no_valid_hosts(self):
+ self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id in (8,9)')
+ self._create_job(synchronous=True, metahosts=[self.label5.id],
+ atomic_group=1)
+ self._run_scheduler()
+ # no hosts in the selected group and label are valid. no schedulings.
+ self._check_for_extra_schedulings()
+
+
+ def test_atomic_group_scheduling_metahost_works(self):
+ # Test that atomic group scheduling also obeys metahosts.
+ self._create_job(metahosts=[0], atomic_group=1)
+ self._run_scheduler()
+ # There are no atomic group hosts that also have that metahost.
+ self._check_for_extra_schedulings()
+
+ job_b = self._create_job(metahosts=[self.label5.id], atomic_group=1)
+ self._run_scheduler()
+ self._assert_job_scheduled_on(job_b.id, 8)
+ self._assert_job_scheduled_on(job_b.id, 9)
+ self._check_for_extra_schedulings()
+
+
+ def test_atomic_group_skips_ineligible_hosts(self):
+ # Test hosts marked ineligible for this job are not eligible.
+ # How would this ever happen anyways?
+ job = self._create_job(metahosts=[self.label4.id], atomic_group=1)
+ models.IneligibleHostQueue.objects.create(job=job, host_id=5)
+ models.IneligibleHostQueue.objects.create(job=job, host_id=6)
+ models.IneligibleHostQueue.objects.create(job=job, host_id=7)
+ self._run_scheduler()
+ # No scheduling should occur as all desired hosts were ineligible.
+ self._check_for_extra_schedulings()
+
+
+ def test_atomic_group_scheduling_fail(self):
+ # If synch_count is > the atomic group number of machines, the job
+ # should be aborted immediately.
+ model_job = self._create_job(synchronous=True, atomic_group=1)
+ model_job.synch_count = 4
+ model_job.save()
+ job = scheduler_models.Job(id=model_job.id)
+ self._run_scheduler()
+ self._check_for_extra_schedulings()
+ queue_entries = job.get_host_queue_entries()
+ self.assertEqual(1, len(queue_entries))
+ self.assertEqual(queue_entries[0].status,
+ models.HostQueueEntry.Status.ABORTED)
+
+
+ def test_atomic_group_no_labels_no_scheduling(self):
+ # Never schedule on atomic groups marked invalid.
+ job = self._create_job(metahosts=[self.label5.id], synchronous=True,
+ atomic_group=1)
+ # Deleting an atomic group via the frontend marks it invalid and
+ # removes all label references to the group. The job now references
+ # an invalid atomic group with no labels associated with it.
+ self.label5.atomic_group.invalid = True
+ self.label5.atomic_group.save()
+ self.label5.atomic_group = None
+ self.label5.save()
+
+ self._run_scheduler()
+ self._check_for_extra_schedulings()
+
+
+ def test_schedule_directly_on_atomic_group_host_fail(self):
+ # Scheduling a job directly on hosts in an atomic group must
+ # fail to avoid users inadvertently holding up the use of an
+ # entire atomic group by using the machines individually.
+ job = self._create_job(hosts=[5])
+ self._run_scheduler()
+ self._check_for_extra_schedulings()
+
+
+ def test_schedule_directly_on_atomic_group_host(self):
+ # Scheduling a job directly on one host in an atomic group will
+ # work when the atomic group is listed on the HQE in addition
+ # to the host (assuming the sync count is 1).
+ job = self._create_job(hosts=[5], atomic_group=1)
+ self._run_scheduler()
+ self._assert_job_scheduled_on(job.id, 5)
+ self._check_for_extra_schedulings()
+
+
+ def test_schedule_directly_on_atomic_group_hosts_sync2(self):
+ job = self._create_job(hosts=[5,8], atomic_group=1, synchronous=True)
+ self._run_scheduler()
+ self._assert_job_scheduled_on(job.id, 5)
+ self._assert_job_scheduled_on(job.id, 8)
+ self._check_for_extra_schedulings()
+
+
+ def test_schedule_directly_on_atomic_group_hosts_wrong_group(self):
+ job = self._create_job(hosts=[5,8], atomic_group=2, synchronous=True)
+ self._run_scheduler()
+ self._check_for_extra_schedulings()
+
+
+ # TODO(gps): These should probably live in their own TestCase class
+ # specific to testing HostScheduler methods directly. It was convenient
+ # to put it here for now to share existing test environment setup code.
+ def test_HostScheduler_check_atomic_group_labels(self):
+ normal_job = self._create_job(metahosts=[0])
+ atomic_job = self._create_job(atomic_group=1)
+ # Indirectly initialize the internal state of the host scheduler.
+ self._dispatcher._refresh_pending_queue_entries()
+
+ atomic_hqe = scheduler_models.HostQueueEntry.fetch(where='job_id=%d' %
+ atomic_job.id)[0]
+ normal_hqe = scheduler_models.HostQueueEntry.fetch(where='job_id=%d' %
+ normal_job.id)[0]
+
+ host_scheduler = self._dispatcher._host_scheduler
+ self.assertTrue(host_scheduler._check_atomic_group_labels(
+ [self.label4.id], atomic_hqe))
+ self.assertFalse(host_scheduler._check_atomic_group_labels(
+ [self.label4.id], normal_hqe))
+ self.assertFalse(host_scheduler._check_atomic_group_labels(
+ [self.label5.id, self.label6.id, self.label7.id], normal_hqe))
+ self.assertTrue(host_scheduler._check_atomic_group_labels(
+ [self.label4.id, self.label6.id], atomic_hqe))
+ self.assertTrue(host_scheduler._check_atomic_group_labels(
+ [self.label4.id, self.label5.id],
+ atomic_hqe))
+
+
+class OnlyIfNeededTest(monitor_db_unittest.DispatcherSchedulingTest):
+
+ def _setup_test_only_if_needed_labels(self):
+ # apply only_if_needed label3 to host1
+ models.Host.smart_get('host1').labels.add(self.label3)
+ return self._create_job_simple([1], use_metahost=True)
+
+
+ def test_only_if_needed_labels_avoids_host(self):
+ job = self._setup_test_only_if_needed_labels()
+ # if the job doesn't depend on label3, there should be no scheduling
+ self._run_scheduler()
+ self._check_for_extra_schedulings()
+
+
+ def test_only_if_needed_labels_schedules(self):
+ job = self._setup_test_only_if_needed_labels()
+ job.dependency_labels.add(self.label3)
+ self._run_scheduler()
+ self._assert_job_scheduled_on(1, 1)
+ self._check_for_extra_schedulings()
+
+
+ def test_only_if_needed_labels_via_metahost(self):
+ job = self._setup_test_only_if_needed_labels()
+ job.dependency_labels.add(self.label3)
+ # should also work if the metahost is the only_if_needed label
+ self._do_query('DELETE FROM afe_jobs_dependency_labels')
+ self._create_job(metahosts=[3])
+ self._run_scheduler()
+ self._assert_job_scheduled_on(2, 1)
+ self._check_for_extra_schedulings()
+
+
+ def test_metahosts_obey_blocks(self):
+ """
+ Metahosts can't get scheduled on hosts already scheduled for
+ that job.
+ """
+ self._create_job(metahosts=[1], hosts=[1])
+ # make the nonmetahost entry complete, so the metahost can try
+ # to get scheduled
+ self._update_hqe(set='complete = 1', where='host_id=1')
+ self._run_scheduler()
+ self._check_for_extra_schedulings()
+
+
diff --git a/scheduler/host_scheduler.py b/scheduler/host_scheduler.py
index da712c2..b22230b 100644
--- a/scheduler/host_scheduler.py
+++ b/scheduler/host_scheduler.py
@@ -1,5 +1,5 @@
"""
-Autotest scheduling utility.
+Autotest client module for the rdb.
"""
@@ -8,7 +8,7 @@
from autotest_lib.client.common_lib import global_config, utils
from autotest_lib.frontend.afe import models
from autotest_lib.scheduler import metahost_scheduler, scheduler_config
-from autotest_lib.scheduler import scheduler_models
+from autotest_lib.scheduler import rdb, scheduler_models
from autotest_lib.site_utils.graphite import stats
from autotest_lib.server.cros import provision
@@ -350,12 +350,6 @@
return host_object and host_object.invalid
- def _schedule_non_metahost(self, queue_entry):
- if not self.is_host_eligible_for_job(queue_entry.host_id, queue_entry):
- return None
- return self._hosts_available.pop(queue_entry.host_id, None)
-
-
def is_host_usable(self, host_id):
if host_id not in self._hosts_available:
# host was already used during this scheduling cycle
@@ -367,17 +361,100 @@
return True
+ def get_job_info(self, queue_entry):
+ """
+ Extract job information from a queue_entry/host-scheduler.
+
+ Unfortunately the information needed to choose hosts for a job
+ are split across several tables and not restricted to just the
+ hqe. At the very least we require the deps and acls of the job, but
+ we also need to know if the job has a host assigned to it. This method
+ consolidates this information into a light-weight dictionary that the
+ host_scheduler and the rdb can pass back and forth.
+
+ @param queue_entry: the queue_entry of the job we would like
+ information about.
+
+ @return: A dictionary containing 1. A set of deps 2. A set of acls
+ 3. The host id of the host assigned to the hqe, or None.
+ """
+ job_id = queue_entry.job_id
+ host_id = queue_entry.host_id
+ job_deps = self._job_dependencies.get(job_id, set())
+ job_deps = set([dep for dep in job_deps if
+ not provision.can_provision(self._labels[dep].name)])
+ job_acls = self._job_acls.get(job_id, set())
+
+ return {'deps': set(job_deps),
+ 'acls': set(job_acls),
+ 'host_id': host_id}
+
+
def schedule_entry(self, queue_entry):
- logging.debug('Host_scheduler is scheduling entry: %s', queue_entry)
- if queue_entry.host_id is not None:
- return self._schedule_non_metahost(queue_entry)
+ """
+ Schedule a hqe aginst a host.
- for scheduler in self._metahost_schedulers:
- if scheduler.can_schedule_metahost(queue_entry):
- scheduler.schedule_metahost(queue_entry, self)
+ A hqe can either have a host assigned to it or not. In eithercase
+ however, actually scheduling the hqe on the host involves validating
+ the assignment by checking acls and labels. If the hqe doesn't have a
+ host we need to find a host before we can perform this validation.
+
+ If we successfully validate the host->hqe pairing, return the host. The
+ scheduler will not begin scheduling special tasks for the hqe until it
+ acquires a valid host.
+
+ @param queue_entry: The queue_entry that requires a host.
+ @return: The host assigned to the hqe, if any.
+ """
+ host_id = queue_entry.host_id
+ job_id = queue_entry.job_id
+ job_info = self.get_job_info(queue_entry)
+ host = None
+
+ if host_id:
+ host = self._hosts_available.get(host_id, None)
+
+ # TODO(beeps): Remove the need for 2 rdb calls. Ideally we should
+ # just do one call to validate the assignment, however, since we're
+ # currently still using the host_scheduler, we'd need to pass it
+ # as an argument to validate_host_assignment, which is less clean
+ # than just splitting this work into 2 calls.
+ host_info = rdb.get_host_info(self, host_id)
+
+ # If the host is either unavailable or in-eligible for this job,
+ # defer scheduling this queue_entry till the next tick.
+ if (host is None or not
+ rdb.validate_host_assignment(job_info, host_info)):
return None
+ else:
+ host = rdb.get_host(self, job_info)
+ if host is None:
+ return None
+ queue_entry.set_host(host)
- raise SchedulerError('No metahost scheduler to handle %s' % queue_entry)
+ # TODO(beeps): Make it so we don't need to set the hqe active status
+ # to remove a host from the active pool.
+ # A host will remain in the available pool for as long as its status
+ # is Ready and it is not referenced by an active hqe. The state of
+ # the host is not under our control, as it will only change to
+ # resetting etc whenever the prejob task starts. However, the hqe
+ # is theoretically active from the moment we assign a healthy host
+ # to it. Setting the host on an inactive hqe will not remove it
+ # from the available pool, leading to unnecessary scheduling
+ # overhead.
+ # Without this, we will process each hqe twice because it is still
+ # judged as 'new', and perform the host<->hqe assignment twice,
+ # because the host assigned to the hqe is still 'available', as
+ # the first prejob task only runs at the end of the next tick's
+ # handle_agents call. Note that the status is still 'Queued', and
+ # will remaing 'Queued' till an agent changes it.
+ queue_entry.update_field('active', True)
+
+ # The available_hosts dictionary determines our scheduling decisions
+ # for subsequent jobs processed in this tick.
+ self._hosts_available.pop(host.id)
+ logging.debug('Scheduling job: %s, Host %s', job_id, host.id)
+ return host
def find_eligible_atomic_group(self, queue_entry):
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index 29dc2c1..a0dff2b 100755
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -753,10 +753,11 @@
'ISNULL(host_id), '
'ISNULL(meta_host), '
'job_id')
+ query=('NOT complete AND NOT active AND status="Queued"'
+ 'AND NOT aborted')
return list(scheduler_models.HostQueueEntry.fetch(
joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
- where='NOT complete AND NOT active AND status="Queued"',
- order_by=sort_order))
+ where=query, order_by=sort_order))
def _refresh_pending_queue_entries(self):
diff --git a/scheduler/monitor_db_unittest.py b/scheduler/monitor_db_unittest.py
index b4cfcda..403ea38 100755
--- a/scheduler/monitor_db_unittest.py
+++ b/scheduler/monitor_db_unittest.py
@@ -346,38 +346,6 @@
self._test_obey_ACLs_helper(True)
- def _setup_test_only_if_needed_labels(self):
- # apply only_if_needed label3 to host1
- models.Host.smart_get('host1').labels.add(self.label3)
- return self._create_job_simple([1], use_metahost=True)
-
-
- def test_only_if_needed_labels_avoids_host(self):
- job = self._setup_test_only_if_needed_labels()
- # if the job doesn't depend on label3, there should be no scheduling
- self._run_scheduler()
- self._check_for_extra_schedulings()
-
-
- def test_only_if_needed_labels_schedules(self):
- job = self._setup_test_only_if_needed_labels()
- job.dependency_labels.add(self.label3)
- self._run_scheduler()
- self._assert_job_scheduled_on(1, 1)
- self._check_for_extra_schedulings()
-
-
- def test_only_if_needed_labels_via_metahost(self):
- job = self._setup_test_only_if_needed_labels()
- job.dependency_labels.add(self.label3)
- # should also work if the metahost is the only_if_needed label
- self._do_query('DELETE FROM afe_jobs_dependency_labels')
- self._create_job(metahosts=[3])
- self._run_scheduler()
- self._assert_job_scheduled_on(2, 1)
- self._check_for_extra_schedulings()
-
-
def test_nonmetahost_over_metahost(self):
"""
Non-metahost entries should take priority over metahost entries
@@ -390,46 +358,6 @@
self._check_for_extra_schedulings()
- def test_metahosts_obey_blocks(self):
- """
- Metahosts can't get scheduled on hosts already scheduled for
- that job.
- """
- self._create_job(metahosts=[1], hosts=[1])
- # make the nonmetahost entry complete, so the metahost can try
- # to get scheduled
- self._update_hqe(set='complete = 1', where='host_id=1')
- self._run_scheduler()
- self._check_for_extra_schedulings()
-
-
- # TODO(gps): These should probably live in their own TestCase class
- # specific to testing HostScheduler methods directly. It was convenient
- # to put it here for now to share existing test environment setup code.
- def test_HostScheduler_check_atomic_group_labels(self):
- normal_job = self._create_job(metahosts=[0])
- atomic_job = self._create_job(atomic_group=1)
- # Indirectly initialize the internal state of the host scheduler.
- self._dispatcher._refresh_pending_queue_entries()
-
- atomic_hqe = scheduler_models.HostQueueEntry.fetch(where='job_id=%d' %
- atomic_job.id)[0]
- normal_hqe = scheduler_models.HostQueueEntry.fetch(where='job_id=%d' %
- normal_job.id)[0]
-
- host_scheduler = self._dispatcher._host_scheduler
- self.assertTrue(host_scheduler._check_atomic_group_labels(
- [self.label4.id], atomic_hqe))
- self.assertFalse(host_scheduler._check_atomic_group_labels(
- [self.label4.id], normal_hqe))
- self.assertFalse(host_scheduler._check_atomic_group_labels(
- [self.label5.id, self.label6.id, self.label7.id], normal_hqe))
- self.assertTrue(host_scheduler._check_atomic_group_labels(
- [self.label4.id, self.label6.id], atomic_hqe))
- self.assertTrue(host_scheduler._check_atomic_group_labels(
- [self.label4.id, self.label5.id],
- atomic_hqe))
-
# TODO: Revive this test.
# def test_HostScheduler_get_host_atomic_group_id(self):
# job = self._create_job(metahosts=[self.label6.id])
@@ -470,237 +398,6 @@
# self.assertEquals(1, host_scheduler._get_host_atomic_group_id(
# [self.label7.id, self.label5.id]))
-
- def test_atomic_group_hosts_blocked_from_non_atomic_jobs(self):
- # Create a job scheduled to run on label6.
- self._create_job(metahosts=[self.label6.id])
- self._run_scheduler()
- # label6 only has hosts that are in atomic groups associated with it,
- # there should be no scheduling.
- self._check_for_extra_schedulings()
-
-
- def test_atomic_group_hosts_blocked_from_non_atomic_jobs_explicit(self):
- # Create a job scheduled to run on label5. This is an atomic group
- # label but this job does not request atomic group scheduling.
- self._create_job(metahosts=[self.label5.id])
- self._run_scheduler()
- # label6 only has hosts that are in atomic groups associated with it,
- # there should be no scheduling.
- self._check_for_extra_schedulings()
-
-
- def test_atomic_group_scheduling_basics(self):
- # Create jobs scheduled to run on an atomic group.
- job_a = self._create_job(synchronous=True, metahosts=[self.label4.id],
- atomic_group=1)
- job_b = self._create_job(synchronous=True, metahosts=[self.label5.id],
- atomic_group=1)
- self._run_scheduler()
- # atomic_group.max_number_of_machines was 2 so we should run on 2.
- self._assert_job_scheduled_on_number_of(job_a.id, (5, 6, 7), 2)
- self._assert_job_scheduled_on(job_b.id, 8) # label5
- self._assert_job_scheduled_on(job_b.id, 9) # label5
- self._check_for_extra_schedulings()
-
- # The three host label4 atomic group still has one host available.
- # That means a job with a synch_count of 1 asking to be scheduled on
- # the atomic group can still use the final machine.
- #
- # This may seem like a somewhat odd use case. It allows the use of an
- # atomic group as a set of machines to run smaller jobs within (a set
- # of hosts configured for use in network tests with eachother perhaps?)
- onehost_job = self._create_job(atomic_group=1)
- self._run_scheduler()
- self._assert_job_scheduled_on_number_of(onehost_job.id, (5, 6, 7), 1)
- self._check_for_extra_schedulings()
-
- # No more atomic groups have hosts available, no more jobs should
- # be scheduled.
- self._create_job(atomic_group=1)
- self._run_scheduler()
- self._check_for_extra_schedulings()
-
-
- def test_atomic_group_scheduling_obeys_acls(self):
- # Request scheduling on a specific atomic label but be denied by ACLs.
- self._do_query('DELETE FROM afe_acl_groups_hosts '
- 'WHERE host_id in (8,9)')
- job = self._create_job(metahosts=[self.label5.id], atomic_group=1)
- self._run_scheduler()
- self._check_for_extra_schedulings()
-
-
- def test_atomic_group_scheduling_dependency_label_exclude(self):
- # A dependency label that matches no hosts in the atomic group.
- job_a = self._create_job(atomic_group=1)
- job_a.dependency_labels.add(self.label3)
- self._run_scheduler()
- self._check_for_extra_schedulings()
-
-
- def test_atomic_group_scheduling_metahost_dependency_label_exclude(self):
- # A metahost and dependency label that excludes too many hosts.
- job_b = self._create_job(synchronous=True, metahosts=[self.label4.id],
- atomic_group=1)
- job_b.dependency_labels.add(self.label7)
- self._run_scheduler()
- self._check_for_extra_schedulings()
-
-
- def test_atomic_group_scheduling_dependency_label_match(self):
- # A dependency label that exists on enough atomic group hosts in only
- # one of the two atomic group labels.
- job_c = self._create_job(synchronous=True, atomic_group=1)
- job_c.dependency_labels.add(self.label7)
- self._run_scheduler()
- self._assert_job_scheduled_on_number_of(job_c.id, (8, 9), 2)
- self._check_for_extra_schedulings()
-
-
- def test_atomic_group_scheduling_no_metahost(self):
- # Force it to schedule on the other group for a reliable test.
- self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=9')
- # An atomic job without a metahost.
- job = self._create_job(synchronous=True, atomic_group=1)
- self._run_scheduler()
- self._assert_job_scheduled_on_number_of(job.id, (5, 6, 7), 2)
- self._check_for_extra_schedulings()
-
-
- def test_atomic_group_scheduling_partial_group(self):
- # Make one host in labels[3] unavailable so that there are only two
- # hosts left in the group.
- self._do_query('UPDATE afe_hosts SET status="Repair Failed" WHERE id=5')
- job = self._create_job(synchronous=True, metahosts=[self.label4.id],
- atomic_group=1)
- self._run_scheduler()
- # Verify that it was scheduled on the 2 ready hosts in that group.
- self._assert_job_scheduled_on(job.id, 6)
- self._assert_job_scheduled_on(job.id, 7)
- self._check_for_extra_schedulings()
-
-
- def test_atomic_group_scheduling_not_enough_available(self):
- # Mark some hosts in each atomic group label as not usable.
- # One host running, another invalid in the first group label.
- self._do_query('UPDATE afe_hosts SET status="Running" WHERE id=5')
- self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=6')
- # One host invalid in the second group label.
- self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=9')
- # Nothing to schedule when no group label has enough (2) good hosts..
- self._create_job(atomic_group=1, synchronous=True)
- self._run_scheduler()
- # There are not enough hosts in either atomic group,
- # No more scheduling should occur.
- self._check_for_extra_schedulings()
-
- # Now create an atomic job that has a synch count of 1. It should
- # schedule on exactly one of the hosts.
- onehost_job = self._create_job(atomic_group=1)
- self._run_scheduler()
- self._assert_job_scheduled_on_number_of(onehost_job.id, (7, 8), 1)
-
-
- def test_atomic_group_scheduling_no_valid_hosts(self):
- self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id in (8,9)')
- self._create_job(synchronous=True, metahosts=[self.label5.id],
- atomic_group=1)
- self._run_scheduler()
- # no hosts in the selected group and label are valid. no schedulings.
- self._check_for_extra_schedulings()
-
-
- def test_atomic_group_scheduling_metahost_works(self):
- # Test that atomic group scheduling also obeys metahosts.
- self._create_job(metahosts=[0], atomic_group=1)
- self._run_scheduler()
- # There are no atomic group hosts that also have that metahost.
- self._check_for_extra_schedulings()
-
- job_b = self._create_job(metahosts=[self.label5.id], atomic_group=1)
- self._run_scheduler()
- self._assert_job_scheduled_on(job_b.id, 8)
- self._assert_job_scheduled_on(job_b.id, 9)
- self._check_for_extra_schedulings()
-
-
- def test_atomic_group_skips_ineligible_hosts(self):
- # Test hosts marked ineligible for this job are not eligible.
- # How would this ever happen anyways?
- job = self._create_job(metahosts=[self.label4.id], atomic_group=1)
- models.IneligibleHostQueue.objects.create(job=job, host_id=5)
- models.IneligibleHostQueue.objects.create(job=job, host_id=6)
- models.IneligibleHostQueue.objects.create(job=job, host_id=7)
- self._run_scheduler()
- # No scheduling should occur as all desired hosts were ineligible.
- self._check_for_extra_schedulings()
-
-
- def test_atomic_group_scheduling_fail(self):
- # If synch_count is > the atomic group number of machines, the job
- # should be aborted immediately.
- model_job = self._create_job(synchronous=True, atomic_group=1)
- model_job.synch_count = 4
- model_job.save()
- job = scheduler_models.Job(id=model_job.id)
- self._run_scheduler()
- self._check_for_extra_schedulings()
- queue_entries = job.get_host_queue_entries()
- self.assertEqual(1, len(queue_entries))
- self.assertEqual(queue_entries[0].status,
- models.HostQueueEntry.Status.ABORTED)
-
-
- def test_atomic_group_no_labels_no_scheduling(self):
- # Never schedule on atomic groups marked invalid.
- job = self._create_job(metahosts=[self.label5.id], synchronous=True,
- atomic_group=1)
- # Deleting an atomic group via the frontend marks it invalid and
- # removes all label references to the group. The job now references
- # an invalid atomic group with no labels associated with it.
- self.label5.atomic_group.invalid = True
- self.label5.atomic_group.save()
- self.label5.atomic_group = None
- self.label5.save()
-
- self._run_scheduler()
- self._check_for_extra_schedulings()
-
-
- def test_schedule_directly_on_atomic_group_host_fail(self):
- # Scheduling a job directly on hosts in an atomic group must
- # fail to avoid users inadvertently holding up the use of an
- # entire atomic group by using the machines individually.
- job = self._create_job(hosts=[5])
- self._run_scheduler()
- self._check_for_extra_schedulings()
-
-
- def test_schedule_directly_on_atomic_group_host(self):
- # Scheduling a job directly on one host in an atomic group will
- # work when the atomic group is listed on the HQE in addition
- # to the host (assuming the sync count is 1).
- job = self._create_job(hosts=[5], atomic_group=1)
- self._run_scheduler()
- self._assert_job_scheduled_on(job.id, 5)
- self._check_for_extra_schedulings()
-
-
- def test_schedule_directly_on_atomic_group_hosts_sync2(self):
- job = self._create_job(hosts=[5,8], atomic_group=1, synchronous=True)
- self._run_scheduler()
- self._assert_job_scheduled_on(job.id, 5)
- self._assert_job_scheduled_on(job.id, 8)
- self._check_for_extra_schedulings()
-
-
- def test_schedule_directly_on_atomic_group_hosts_wrong_group(self):
- job = self._create_job(hosts=[5,8], atomic_group=2, synchronous=True)
- self._run_scheduler()
- self._check_for_extra_schedulings()
-
-
def test_only_schedule_queued_entries(self):
self._create_job(metahosts=[1])
self._update_hqe(set='active=1, host_id=2')
diff --git a/scheduler/rdb.py b/scheduler/rdb.py
new file mode 100644
index 0000000..8dc3bcd
--- /dev/null
+++ b/scheduler/rdb.py
@@ -0,0 +1,111 @@
+"""
+Rdb server module.
+"""
+import logging
+from autotest_lib.site_utils.graphite import stats
+
+
+_timer = stats.Timer('rdb')
+
+def _check_acls(job_acls, host_acls):
+ if job_acls is None or host_acls is None:
+ return False
+ return len(host_acls.intersection(job_acls))
+
+
+def _check_deps(job_deps, host_labels):
+ if job_deps is None or host_labels is None:
+ return False
+ return len(job_deps - host_labels) == 0
+
+
+def validate_host_assignment(job_info, host_info):
+ """ Validate this job<->host pairing.
+
+ @param job_info: Information about the job as determined by
+ the client rdb module.
+ @param host_info: Information about the host as determined by
+ get_host_info.
+
+ @return: True if the job<->host pairing is valid, False otherwise.
+ False, if we don't have enough information to make a decision.
+ """
+ one_time_host = host_info.get('invalid') and job_info.get('host_id')
+
+ return (_check_acls(job_info.get('acls'), host_info.get('acls')) and
+ _check_deps(job_info.get('deps'), host_info.get('labels')) and
+ not host_info.get('invalid', True) or one_time_host and
+ not host_info.get('locked', True))
+
+
+def get_host_info(host_scheduler, host_id):
+ """
+ Utility method to parse information about a host into a dictionary.
+
+ Ideally this can just return the Host object, but doing this has the
+ following advantages:
+ 1. Changes to the schema will only require changes to this method.
+ 2. We can reimplement this method to make use of django caching.
+ 3. We can lock rows of the host table in a centralized location.
+
+ @param host_id: id of the host to get information about.
+ @return: A dictionary containing all information needed to make a
+ scheduling decision regarding this host.
+ """
+ acls = host_scheduler._host_acls.get(host_id, set())
+ labels = host_scheduler._host_labels.get(host_id, set())
+ host_info = {'labels': labels, 'acls': acls}
+ host = host_scheduler._hosts_available.get(host_id)
+ if host:
+ host_info.update({'locked': host.locked, 'invalid': host.invalid})
+ return host_info
+
+
+def _order_labels(host_scheduler, labels):
+ """Given a list of labels, order them by available host count.
+
+ To make a scheduling decision, we need a host that matches all dependencies
+ of a job, hence the most restrictive search space we can use is the list
+ of ready hosts that have the least frequent label.
+
+ @param labels: A list of labels. If no hosts are available in a label,
+ it will be the first in this list.
+ """
+ label_count = [len(host_scheduler._label_hosts.get(label, []))
+ for label in labels]
+ return [label_tuple[1] for label_tuple in sorted(zip(label_count, labels))]
+
+
+@_timer.decorate
+def get_host(host_scheduler, job_info):
+ """
+ Get a host matching the job's selection criterion.
+
+ - Get all hosts in rarest label.
+ - Check which ones are still usable.
+ - Return the first of these hosts that passes our validity checks.
+
+ @param job_info: A dictionary of job information needed to pick a host.
+
+ @return: A host object from the available_hosts map.
+ """
+
+ # A job must at least have one dependency (eg:'board:') in order for us to
+ # find a host for it. To do so we use 2 data structures of host_scheduler:
+ # - label to hosts map: to count label frequencies, and get hosts in a label
+ # - hosts_available map: to mark a host as used, as it would be difficult
+ # to delete this host from all the label keys it has, in the label to
+ # hosts map.
+ rarest_label = _order_labels(host_scheduler, job_info.get('deps'))[0]
+
+ # TODO(beeps): Once we have implemented locking in afe_hosts make this:
+ # afe.models.Host.object.filter(locked).filter(acls).filter(labels)...
+ # where labels are chained according to frequency. Currently this will
+ # require a join across all hqes which could be expensive, and is
+ # unnecessary anyway since we need to move away from this scheduling model.
+ hosts_considered = host_scheduler._label_hosts.get(rarest_label, [])
+ for host_id in hosts_considered:
+ host = host_scheduler._hosts_available.get(host_id)
+ host_info = get_host_info(host_scheduler, host_id)
+ if host and validate_host_assignment(job_info, host_info):
+ return host
diff --git a/server/cros/dynamic_suite/suite.py b/server/cros/dynamic_suite/suite.py
index 3d0eb0c..6eb56a5 100644
--- a/server/cros/dynamic_suite/suite.py
+++ b/server/cros/dynamic_suite/suite.py
@@ -331,12 +331,17 @@
job_deps = []
else:
job_deps = list(test.dependencies)
-
if self._extra_deps:
job_deps.extend(self._extra_deps)
if self._pool:
job_deps.append(self._pool)
+ # TODO(beeps): Comletely remove the concept of a metahost.
+ # Currently we use this to distinguis a job scheduled through
+ # the afe from a suite job, as only the latter will get requeued
+ # when a special task fails.
+ job_deps.append(self._board)
+
test_obj = self._afe.create_job(
control_file=test.text,
name='/'.join([self._build, self._tag, test.name]),
diff --git a/server/cros/dynamic_suite/suite_unittest.py b/server/cros/dynamic_suite/suite_unittest.py
index 160e445..b7add75 100644
--- a/server/cros/dynamic_suite/suite_unittest.py
+++ b/server/cros/dynamic_suite/suite_unittest.py
@@ -233,6 +233,7 @@
dependencies.extend(test.dependencies)
if suite_deps:
dependencies.extend(suite_deps)
+ dependencies.append(self._BOARD)
job_mock = self.afe.create_job(
control_file=test.text,
name=mox.And(mox.StrContains(self._BUILD),
diff --git a/utils/unittest_suite.py b/utils/unittest_suite.py
index e1552bf..97f4612 100755
--- a/utils/unittest_suite.py
+++ b/utils/unittest_suite.py
@@ -89,6 +89,7 @@
'chaos_base_test.py',
'chaos_interop_test.py',
'monitor_db_functional_test.py',
+ 'atomic_group_unittests.py',
# crbug.com/251395
'dev_server_test.py',
'full_release_test.py',