[autotest] RDB refactor I

Initial refactor for the rdb, implementes 1 in this schematic:
https://x20web.corp.google.com/~beeps/rdb_v1_midway.jpg

Also achieves the following:
- Don't process an hqe more than once, after having assigned a host to it.
- Don't assign a host to a queued, aborted hqe.
- Drop the metahost concept.
- Stop using labelmetahostscheduler to find hosts for non-metahost jobs.
- Include a database migration script for jobs that were still queued during
  the scheduler restart, since they will now need a meta_host dependency.

This cl also doesn't support the schedulers ability to:
- Schedule an atomic group
 * Consequently, also the ability to block a host even when the hqe using it is
   no longer active.
- Schedule a metahost differently from a non-metahost
 * Both metahosts and non-metahosts are just labels now
 * Jobs which are already assigned hosts are still give precedence, though
- Schedule based on only_if_needed.

And fixes the unittests appropriately.

TEST=Ran suites, unittests. Restarted scheduler after applying these changes
     and tested migration. Ran suite scheduler.
BUG=chromium:314082,chromium:314219,chromium:313680,chromium:315824,chromium:312333
DEPLOY=scheduler, migrate

Change-Id: I70c3c3c740e51581db88fe3ce5879c53d6e6511e
Reviewed-on: https://chromium-review.googlesource.com/175957
Reviewed-by: Alex Miller <milleral@chromium.org>
Commit-Queue: Prashanth B <beeps@chromium.org>
Tested-by: Prashanth B <beeps@chromium.org>
diff --git a/frontend/afe/frontend_test_utils.py b/frontend/afe/frontend_test_utils.py
index 158f1ac..6f53af3 100644
--- a/frontend/afe/frontend_test_utils.py
+++ b/frontend/afe/frontend_test_utils.py
@@ -121,6 +121,12 @@
             reboot_before=model_attributes.RebootBefore.NEVER,
             drone_set=drone_set, control_file=control_file,
             parameterized_job=parameterized_job)
+
+        # Update the job's dependencies to include the metahost.
+        for metahost_label in metahosts:
+            dep = models.Label.objects.get(id=metahost_label)
+            job.dependency_labels.add(dep)
+
         for host_id in hosts:
             models.HostQueueEntry.objects.create(job=job, host_id=host_id,
                                                  status=status,
diff --git a/frontend/migrations/084_convert_metahost_to_label.py b/frontend/migrations/084_convert_metahost_to_label.py
new file mode 100644
index 0000000..9fb154c
--- /dev/null
+++ b/frontend/migrations/084_convert_metahost_to_label.py
@@ -0,0 +1,8 @@
+UP_SQL = """
+INSERT INTO afe_jobs_dependency_labels (job_id, label_id)
+SELECT job_id, meta_host FROM afe_host_queue_entries
+WHERE NOT complete AND NOT active AND status="Queued" AND NOT aborted;
+"""
+
+DOWN_SQL="""
+"""
diff --git a/scheduler/atomic_group_unittests.py b/scheduler/atomic_group_unittests.py
new file mode 100644
index 0000000..5a9e9c9
--- /dev/null
+++ b/scheduler/atomic_group_unittests.py
@@ -0,0 +1,329 @@
+#!/usr/bin/python
+#pylint: disable-msg=C0111
+
+import gc, time
+import common
+from autotest_lib.frontend import setup_django_environment
+from autotest_lib.frontend.afe import frontend_test_utils
+from autotest_lib.client.common_lib.test_utils import mock
+from autotest_lib.client.common_lib.test_utils import unittest
+from autotest_lib.database import database_connection
+from autotest_lib.frontend.afe import models
+from autotest_lib.scheduler import agent_task
+from autotest_lib.scheduler import monitor_db, drone_manager, email_manager
+from autotest_lib.scheduler import pidfile_monitor
+from autotest_lib.scheduler import scheduler_config, gc_stats, host_scheduler
+from autotest_lib.scheduler import monitor_db_functional_test
+from autotest_lib.scheduler import monitor_db_unittest
+from autotest_lib.scheduler import scheduler_models
+
+_DEBUG = False
+
+
+class AtomicGroupTest(monitor_db_unittest.DispatcherSchedulingTest):
+
+    def test_atomic_group_hosts_blocked_from_non_atomic_jobs(self):
+        # Create a job scheduled to run on label6.
+        self._create_job(metahosts=[self.label6.id])
+        self._run_scheduler()
+        # label6 only has hosts that are in atomic groups associated with it,
+        # there should be no scheduling.
+        self._check_for_extra_schedulings()
+
+
+    def test_atomic_group_hosts_blocked_from_non_atomic_jobs_explicit(self):
+        # Create a job scheduled to run on label5.  This is an atomic group
+        # label but this job does not request atomic group scheduling.
+        self._create_job(metahosts=[self.label5.id])
+        self._run_scheduler()
+        # label6 only has hosts that are in atomic groups associated with it,
+        # there should be no scheduling.
+        self._check_for_extra_schedulings()
+
+
+    def test_atomic_group_scheduling_basics(self):
+        # Create jobs scheduled to run on an atomic group.
+        job_a = self._create_job(synchronous=True, metahosts=[self.label4.id],
+                         atomic_group=1)
+        job_b = self._create_job(synchronous=True, metahosts=[self.label5.id],
+                         atomic_group=1)
+        self._run_scheduler()
+        # atomic_group.max_number_of_machines was 2 so we should run on 2.
+        self._assert_job_scheduled_on_number_of(job_a.id, (5, 6, 7), 2)
+        self._assert_job_scheduled_on(job_b.id, 8)  # label5
+        self._assert_job_scheduled_on(job_b.id, 9)  # label5
+        self._check_for_extra_schedulings()
+
+        # The three host label4 atomic group still has one host available.
+        # That means a job with a synch_count of 1 asking to be scheduled on
+        # the atomic group can still use the final machine.
+        #
+        # This may seem like a somewhat odd use case.  It allows the use of an
+        # atomic group as a set of machines to run smaller jobs within (a set
+        # of hosts configured for use in network tests with eachother perhaps?)
+        onehost_job = self._create_job(atomic_group=1)
+        self._run_scheduler()
+        self._assert_job_scheduled_on_number_of(onehost_job.id, (5, 6, 7), 1)
+        self._check_for_extra_schedulings()
+
+        # No more atomic groups have hosts available, no more jobs should
+        # be scheduled.
+        self._create_job(atomic_group=1)
+        self._run_scheduler()
+        self._check_for_extra_schedulings()
+
+
+    def test_atomic_group_scheduling_obeys_acls(self):
+        # Request scheduling on a specific atomic label but be denied by ACLs.
+        self._do_query('DELETE FROM afe_acl_groups_hosts '
+                       'WHERE host_id in (8,9)')
+        job = self._create_job(metahosts=[self.label5.id], atomic_group=1)
+        self._run_scheduler()
+        self._check_for_extra_schedulings()
+
+
+    def test_atomic_group_scheduling_dependency_label_exclude(self):
+        # A dependency label that matches no hosts in the atomic group.
+        job_a = self._create_job(atomic_group=1)
+        job_a.dependency_labels.add(self.label3)
+        self._run_scheduler()
+        self._check_for_extra_schedulings()
+
+
+    def test_atomic_group_scheduling_metahost_dependency_label_exclude(self):
+        # A metahost and dependency label that excludes too many hosts.
+        job_b = self._create_job(synchronous=True, metahosts=[self.label4.id],
+                                 atomic_group=1)
+        job_b.dependency_labels.add(self.label7)
+        self._run_scheduler()
+        self._check_for_extra_schedulings()
+
+
+    def test_atomic_group_scheduling_dependency_label_match(self):
+        # A dependency label that exists on enough atomic group hosts in only
+        # one of the two atomic group labels.
+        job_c = self._create_job(synchronous=True, atomic_group=1)
+        job_c.dependency_labels.add(self.label7)
+        self._run_scheduler()
+        self._assert_job_scheduled_on_number_of(job_c.id, (8, 9), 2)
+        self._check_for_extra_schedulings()
+
+
+    def test_atomic_group_scheduling_no_metahost(self):
+        # Force it to schedule on the other group for a reliable test.
+        self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=9')
+        # An atomic job without a metahost.
+        job = self._create_job(synchronous=True, atomic_group=1)
+        self._run_scheduler()
+        self._assert_job_scheduled_on_number_of(job.id, (5, 6, 7), 2)
+        self._check_for_extra_schedulings()
+
+
+    def test_atomic_group_scheduling_partial_group(self):
+        # Make one host in labels[3] unavailable so that there are only two
+        # hosts left in the group.
+        self._do_query('UPDATE afe_hosts SET status="Repair Failed" WHERE id=5')
+        job = self._create_job(synchronous=True, metahosts=[self.label4.id],
+                         atomic_group=1)
+        self._run_scheduler()
+        # Verify that it was scheduled on the 2 ready hosts in that group.
+        self._assert_job_scheduled_on(job.id, 6)
+        self._assert_job_scheduled_on(job.id, 7)
+        self._check_for_extra_schedulings()
+
+
+    def test_atomic_group_scheduling_not_enough_available(self):
+        # Mark some hosts in each atomic group label as not usable.
+        # One host running, another invalid in the first group label.
+        self._do_query('UPDATE afe_hosts SET status="Running" WHERE id=5')
+        self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=6')
+        # One host invalid in the second group label.
+        self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=9')
+        # Nothing to schedule when no group label has enough (2) good hosts..
+        self._create_job(atomic_group=1, synchronous=True)
+        self._run_scheduler()
+        # There are not enough hosts in either atomic group,
+        # No more scheduling should occur.
+        self._check_for_extra_schedulings()
+
+        # Now create an atomic job that has a synch count of 1.  It should
+        # schedule on exactly one of the hosts.
+        onehost_job = self._create_job(atomic_group=1)
+        self._run_scheduler()
+        self._assert_job_scheduled_on_number_of(onehost_job.id, (7, 8), 1)
+
+
+    def test_atomic_group_scheduling_no_valid_hosts(self):
+        self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id in (8,9)')
+        self._create_job(synchronous=True, metahosts=[self.label5.id],
+                         atomic_group=1)
+        self._run_scheduler()
+        # no hosts in the selected group and label are valid.  no schedulings.
+        self._check_for_extra_schedulings()
+
+
+    def test_atomic_group_scheduling_metahost_works(self):
+        # Test that atomic group scheduling also obeys metahosts.
+        self._create_job(metahosts=[0], atomic_group=1)
+        self._run_scheduler()
+        # There are no atomic group hosts that also have that metahost.
+        self._check_for_extra_schedulings()
+
+        job_b = self._create_job(metahosts=[self.label5.id], atomic_group=1)
+        self._run_scheduler()
+        self._assert_job_scheduled_on(job_b.id, 8)
+        self._assert_job_scheduled_on(job_b.id, 9)
+        self._check_for_extra_schedulings()
+
+
+    def test_atomic_group_skips_ineligible_hosts(self):
+        # Test hosts marked ineligible for this job are not eligible.
+        # How would this ever happen anyways?
+        job = self._create_job(metahosts=[self.label4.id], atomic_group=1)
+        models.IneligibleHostQueue.objects.create(job=job, host_id=5)
+        models.IneligibleHostQueue.objects.create(job=job, host_id=6)
+        models.IneligibleHostQueue.objects.create(job=job, host_id=7)
+        self._run_scheduler()
+        # No scheduling should occur as all desired hosts were ineligible.
+        self._check_for_extra_schedulings()
+
+
+    def test_atomic_group_scheduling_fail(self):
+        # If synch_count is > the atomic group number of machines, the job
+        # should be aborted immediately.
+        model_job = self._create_job(synchronous=True, atomic_group=1)
+        model_job.synch_count = 4
+        model_job.save()
+        job = scheduler_models.Job(id=model_job.id)
+        self._run_scheduler()
+        self._check_for_extra_schedulings()
+        queue_entries = job.get_host_queue_entries()
+        self.assertEqual(1, len(queue_entries))
+        self.assertEqual(queue_entries[0].status,
+                         models.HostQueueEntry.Status.ABORTED)
+
+
+    def test_atomic_group_no_labels_no_scheduling(self):
+        # Never schedule on atomic groups marked invalid.
+        job = self._create_job(metahosts=[self.label5.id], synchronous=True,
+                               atomic_group=1)
+        # Deleting an atomic group via the frontend marks it invalid and
+        # removes all label references to the group.  The job now references
+        # an invalid atomic group with no labels associated with it.
+        self.label5.atomic_group.invalid = True
+        self.label5.atomic_group.save()
+        self.label5.atomic_group = None
+        self.label5.save()
+
+        self._run_scheduler()
+        self._check_for_extra_schedulings()
+
+
+    def test_schedule_directly_on_atomic_group_host_fail(self):
+        # Scheduling a job directly on hosts in an atomic group must
+        # fail to avoid users inadvertently holding up the use of an
+        # entire atomic group by using the machines individually.
+        job = self._create_job(hosts=[5])
+        self._run_scheduler()
+        self._check_for_extra_schedulings()
+
+
+    def test_schedule_directly_on_atomic_group_host(self):
+        # Scheduling a job directly on one host in an atomic group will
+        # work when the atomic group is listed on the HQE in addition
+        # to the host (assuming the sync count is 1).
+        job = self._create_job(hosts=[5], atomic_group=1)
+        self._run_scheduler()
+        self._assert_job_scheduled_on(job.id, 5)
+        self._check_for_extra_schedulings()
+
+
+    def test_schedule_directly_on_atomic_group_hosts_sync2(self):
+        job = self._create_job(hosts=[5,8], atomic_group=1, synchronous=True)
+        self._run_scheduler()
+        self._assert_job_scheduled_on(job.id, 5)
+        self._assert_job_scheduled_on(job.id, 8)
+        self._check_for_extra_schedulings()
+
+
+    def test_schedule_directly_on_atomic_group_hosts_wrong_group(self):
+        job = self._create_job(hosts=[5,8], atomic_group=2, synchronous=True)
+        self._run_scheduler()
+        self._check_for_extra_schedulings()
+
+
+    # TODO(gps): These should probably live in their own TestCase class
+    # specific to testing HostScheduler methods directly.  It was convenient
+    # to put it here for now to share existing test environment setup code.
+    def test_HostScheduler_check_atomic_group_labels(self):
+        normal_job = self._create_job(metahosts=[0])
+        atomic_job = self._create_job(atomic_group=1)
+        # Indirectly initialize the internal state of the host scheduler.
+        self._dispatcher._refresh_pending_queue_entries()
+
+        atomic_hqe = scheduler_models.HostQueueEntry.fetch(where='job_id=%d' %
+                                                     atomic_job.id)[0]
+        normal_hqe = scheduler_models.HostQueueEntry.fetch(where='job_id=%d' %
+                                                     normal_job.id)[0]
+
+        host_scheduler = self._dispatcher._host_scheduler
+        self.assertTrue(host_scheduler._check_atomic_group_labels(
+                [self.label4.id], atomic_hqe))
+        self.assertFalse(host_scheduler._check_atomic_group_labels(
+                [self.label4.id], normal_hqe))
+        self.assertFalse(host_scheduler._check_atomic_group_labels(
+                [self.label5.id, self.label6.id, self.label7.id], normal_hqe))
+        self.assertTrue(host_scheduler._check_atomic_group_labels(
+                [self.label4.id, self.label6.id], atomic_hqe))
+        self.assertTrue(host_scheduler._check_atomic_group_labels(
+                        [self.label4.id, self.label5.id],
+                        atomic_hqe))
+
+
+class OnlyIfNeededTest(monitor_db_unittest.DispatcherSchedulingTest):
+
+    def _setup_test_only_if_needed_labels(self):
+        # apply only_if_needed label3 to host1
+        models.Host.smart_get('host1').labels.add(self.label3)
+        return self._create_job_simple([1], use_metahost=True)
+
+
+    def test_only_if_needed_labels_avoids_host(self):
+        job = self._setup_test_only_if_needed_labels()
+        # if the job doesn't depend on label3, there should be no scheduling
+        self._run_scheduler()
+        self._check_for_extra_schedulings()
+
+
+    def test_only_if_needed_labels_schedules(self):
+        job = self._setup_test_only_if_needed_labels()
+        job.dependency_labels.add(self.label3)
+        self._run_scheduler()
+        self._assert_job_scheduled_on(1, 1)
+        self._check_for_extra_schedulings()
+
+
+    def test_only_if_needed_labels_via_metahost(self):
+        job = self._setup_test_only_if_needed_labels()
+        job.dependency_labels.add(self.label3)
+        # should also work if the metahost is the only_if_needed label
+        self._do_query('DELETE FROM afe_jobs_dependency_labels')
+        self._create_job(metahosts=[3])
+        self._run_scheduler()
+        self._assert_job_scheduled_on(2, 1)
+        self._check_for_extra_schedulings()
+
+
+    def test_metahosts_obey_blocks(self):
+        """
+        Metahosts can't get scheduled on hosts already scheduled for
+        that job.
+        """
+        self._create_job(metahosts=[1], hosts=[1])
+        # make the nonmetahost entry complete, so the metahost can try
+        # to get scheduled
+        self._update_hqe(set='complete = 1', where='host_id=1')
+        self._run_scheduler()
+        self._check_for_extra_schedulings()
+
+
diff --git a/scheduler/host_scheduler.py b/scheduler/host_scheduler.py
index da712c2..b22230b 100644
--- a/scheduler/host_scheduler.py
+++ b/scheduler/host_scheduler.py
@@ -1,5 +1,5 @@
 """
-Autotest scheduling utility.
+Autotest client module for the rdb.
 """
 
 
@@ -8,7 +8,7 @@
 from autotest_lib.client.common_lib import global_config, utils
 from autotest_lib.frontend.afe import models
 from autotest_lib.scheduler import metahost_scheduler, scheduler_config
-from autotest_lib.scheduler import scheduler_models
+from autotest_lib.scheduler import rdb, scheduler_models
 from autotest_lib.site_utils.graphite import stats
 from autotest_lib.server.cros import provision
 
@@ -350,12 +350,6 @@
         return host_object and host_object.invalid
 
 
-    def _schedule_non_metahost(self, queue_entry):
-        if not self.is_host_eligible_for_job(queue_entry.host_id, queue_entry):
-            return None
-        return self._hosts_available.pop(queue_entry.host_id, None)
-
-
     def is_host_usable(self, host_id):
         if host_id not in self._hosts_available:
             # host was already used during this scheduling cycle
@@ -367,17 +361,100 @@
         return True
 
 
+    def get_job_info(self, queue_entry):
+        """
+        Extract job information from a queue_entry/host-scheduler.
+
+        Unfortunately the information needed to choose hosts for a job
+        are split across several tables and not restricted to just the
+        hqe. At the very least we require the deps and acls of the job, but
+        we also need to know if the job has a host assigned to it. This method
+        consolidates this information into a light-weight dictionary that the
+        host_scheduler and the rdb can pass back and forth.
+
+        @param queue_entry: the queue_entry of the job we would like
+            information about.
+
+        @return: A dictionary containing 1. A set of deps 2. A set of acls
+            3. The host id of the host assigned to the hqe, or None.
+        """
+        job_id = queue_entry.job_id
+        host_id = queue_entry.host_id
+        job_deps = self._job_dependencies.get(job_id, set())
+        job_deps = set([dep for dep in job_deps if
+                       not provision.can_provision(self._labels[dep].name)])
+        job_acls = self._job_acls.get(job_id, set())
+
+        return {'deps': set(job_deps),
+                'acls': set(job_acls),
+                'host_id': host_id}
+
+
     def schedule_entry(self, queue_entry):
-        logging.debug('Host_scheduler is scheduling entry: %s', queue_entry)
-        if queue_entry.host_id is not None:
-            return self._schedule_non_metahost(queue_entry)
+        """
+        Schedule a hqe aginst a host.
 
-        for scheduler in self._metahost_schedulers:
-            if scheduler.can_schedule_metahost(queue_entry):
-                scheduler.schedule_metahost(queue_entry, self)
+        A hqe can either have a host assigned to it or not. In eithercase
+        however, actually scheduling the hqe on the host involves validating
+        the assignment by checking acls and labels. If the hqe doesn't have a
+        host we need to find a host before we can perform this validation.
+
+        If we successfully validate the host->hqe pairing, return the host. The
+        scheduler will not begin scheduling special tasks for the hqe until it
+        acquires a valid host.
+
+        @param queue_entry: The queue_entry that requires a host.
+        @return: The host assigned to the hqe, if any.
+        """
+        host_id = queue_entry.host_id
+        job_id = queue_entry.job_id
+        job_info = self.get_job_info(queue_entry)
+        host = None
+
+        if host_id:
+            host = self._hosts_available.get(host_id, None)
+
+            # TODO(beeps): Remove the need for 2 rdb calls. Ideally we should
+            # just do one call to validate the assignment, however, since we're
+            # currently still using the host_scheduler, we'd need to pass it
+            # as an argument to validate_host_assignment, which is less clean
+            # than just splitting this work into 2 calls.
+            host_info = rdb.get_host_info(self, host_id)
+
+            # If the host is either unavailable or in-eligible for this job,
+            # defer scheduling this queue_entry till the next tick.
+            if (host is None or not
+                rdb.validate_host_assignment(job_info, host_info)):
                 return None
+        else:
+            host = rdb.get_host(self, job_info)
+            if host is None:
+                return None
+            queue_entry.set_host(host)
 
-        raise SchedulerError('No metahost scheduler to handle %s' % queue_entry)
+        # TODO(beeps): Make it so we don't need to set the hqe active status
+        # to remove a host from the active pool.
+        # A host will remain in the available pool for as long as its status
+        # is Ready and it is not referenced by an active hqe. The state of
+        # the host is not under our control, as it will only change to
+        # resetting etc whenever the prejob task starts. However, the hqe
+        # is theoretically active from the moment we assign a healthy host
+        # to it. Setting the host on an inactive hqe will not remove it
+        # from the available pool, leading to unnecessary scheduling
+        # overhead.
+        # Without this, we will process each hqe twice because it is still
+        # judged as 'new', and perform the host<->hqe assignment twice,
+        # because the host assigned to the hqe is still 'available', as
+        # the first prejob task only runs at the end of the next tick's
+        # handle_agents call. Note that the status is still 'Queued', and
+        # will remaing 'Queued' till an agent changes it.
+        queue_entry.update_field('active', True)
+
+        # The available_hosts dictionary determines our scheduling decisions
+        # for subsequent jobs processed in this tick.
+        self._hosts_available.pop(host.id)
+        logging.debug('Scheduling job: %s, Host %s', job_id, host.id)
+        return host
 
 
     def find_eligible_atomic_group(self, queue_entry):
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index 29dc2c1..a0dff2b 100755
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -753,10 +753,11 @@
                       'ISNULL(host_id), '
                       'ISNULL(meta_host), '
                       'job_id')
+        query=('NOT complete AND NOT active AND status="Queued"'
+               'AND NOT aborted')
         return list(scheduler_models.HostQueueEntry.fetch(
             joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
-            where='NOT complete AND NOT active AND status="Queued"',
-            order_by=sort_order))
+            where=query, order_by=sort_order))
 
 
     def _refresh_pending_queue_entries(self):
diff --git a/scheduler/monitor_db_unittest.py b/scheduler/monitor_db_unittest.py
index b4cfcda..403ea38 100755
--- a/scheduler/monitor_db_unittest.py
+++ b/scheduler/monitor_db_unittest.py
@@ -346,38 +346,6 @@
         self._test_obey_ACLs_helper(True)
 
 
-    def _setup_test_only_if_needed_labels(self):
-        # apply only_if_needed label3 to host1
-        models.Host.smart_get('host1').labels.add(self.label3)
-        return self._create_job_simple([1], use_metahost=True)
-
-
-    def test_only_if_needed_labels_avoids_host(self):
-        job = self._setup_test_only_if_needed_labels()
-        # if the job doesn't depend on label3, there should be no scheduling
-        self._run_scheduler()
-        self._check_for_extra_schedulings()
-
-
-    def test_only_if_needed_labels_schedules(self):
-        job = self._setup_test_only_if_needed_labels()
-        job.dependency_labels.add(self.label3)
-        self._run_scheduler()
-        self._assert_job_scheduled_on(1, 1)
-        self._check_for_extra_schedulings()
-
-
-    def test_only_if_needed_labels_via_metahost(self):
-        job = self._setup_test_only_if_needed_labels()
-        job.dependency_labels.add(self.label3)
-        # should also work if the metahost is the only_if_needed label
-        self._do_query('DELETE FROM afe_jobs_dependency_labels')
-        self._create_job(metahosts=[3])
-        self._run_scheduler()
-        self._assert_job_scheduled_on(2, 1)
-        self._check_for_extra_schedulings()
-
-
     def test_nonmetahost_over_metahost(self):
         """
         Non-metahost entries should take priority over metahost entries
@@ -390,46 +358,6 @@
         self._check_for_extra_schedulings()
 
 
-    def test_metahosts_obey_blocks(self):
-        """
-        Metahosts can't get scheduled on hosts already scheduled for
-        that job.
-        """
-        self._create_job(metahosts=[1], hosts=[1])
-        # make the nonmetahost entry complete, so the metahost can try
-        # to get scheduled
-        self._update_hqe(set='complete = 1', where='host_id=1')
-        self._run_scheduler()
-        self._check_for_extra_schedulings()
-
-
-    # TODO(gps): These should probably live in their own TestCase class
-    # specific to testing HostScheduler methods directly.  It was convenient
-    # to put it here for now to share existing test environment setup code.
-    def test_HostScheduler_check_atomic_group_labels(self):
-        normal_job = self._create_job(metahosts=[0])
-        atomic_job = self._create_job(atomic_group=1)
-        # Indirectly initialize the internal state of the host scheduler.
-        self._dispatcher._refresh_pending_queue_entries()
-
-        atomic_hqe = scheduler_models.HostQueueEntry.fetch(where='job_id=%d' %
-                                                     atomic_job.id)[0]
-        normal_hqe = scheduler_models.HostQueueEntry.fetch(where='job_id=%d' %
-                                                     normal_job.id)[0]
-
-        host_scheduler = self._dispatcher._host_scheduler
-        self.assertTrue(host_scheduler._check_atomic_group_labels(
-                [self.label4.id], atomic_hqe))
-        self.assertFalse(host_scheduler._check_atomic_group_labels(
-                [self.label4.id], normal_hqe))
-        self.assertFalse(host_scheduler._check_atomic_group_labels(
-                [self.label5.id, self.label6.id, self.label7.id], normal_hqe))
-        self.assertTrue(host_scheduler._check_atomic_group_labels(
-                [self.label4.id, self.label6.id], atomic_hqe))
-        self.assertTrue(host_scheduler._check_atomic_group_labels(
-                        [self.label4.id, self.label5.id],
-                        atomic_hqe))
-
 #    TODO: Revive this test.
 #    def test_HostScheduler_get_host_atomic_group_id(self):
 #        job = self._create_job(metahosts=[self.label6.id])
@@ -470,237 +398,6 @@
 #        self.assertEquals(1, host_scheduler._get_host_atomic_group_id(
 #                [self.label7.id, self.label5.id]))
 
-
-    def test_atomic_group_hosts_blocked_from_non_atomic_jobs(self):
-        # Create a job scheduled to run on label6.
-        self._create_job(metahosts=[self.label6.id])
-        self._run_scheduler()
-        # label6 only has hosts that are in atomic groups associated with it,
-        # there should be no scheduling.
-        self._check_for_extra_schedulings()
-
-
-    def test_atomic_group_hosts_blocked_from_non_atomic_jobs_explicit(self):
-        # Create a job scheduled to run on label5.  This is an atomic group
-        # label but this job does not request atomic group scheduling.
-        self._create_job(metahosts=[self.label5.id])
-        self._run_scheduler()
-        # label6 only has hosts that are in atomic groups associated with it,
-        # there should be no scheduling.
-        self._check_for_extra_schedulings()
-
-
-    def test_atomic_group_scheduling_basics(self):
-        # Create jobs scheduled to run on an atomic group.
-        job_a = self._create_job(synchronous=True, metahosts=[self.label4.id],
-                         atomic_group=1)
-        job_b = self._create_job(synchronous=True, metahosts=[self.label5.id],
-                         atomic_group=1)
-        self._run_scheduler()
-        # atomic_group.max_number_of_machines was 2 so we should run on 2.
-        self._assert_job_scheduled_on_number_of(job_a.id, (5, 6, 7), 2)
-        self._assert_job_scheduled_on(job_b.id, 8)  # label5
-        self._assert_job_scheduled_on(job_b.id, 9)  # label5
-        self._check_for_extra_schedulings()
-
-        # The three host label4 atomic group still has one host available.
-        # That means a job with a synch_count of 1 asking to be scheduled on
-        # the atomic group can still use the final machine.
-        #
-        # This may seem like a somewhat odd use case.  It allows the use of an
-        # atomic group as a set of machines to run smaller jobs within (a set
-        # of hosts configured for use in network tests with eachother perhaps?)
-        onehost_job = self._create_job(atomic_group=1)
-        self._run_scheduler()
-        self._assert_job_scheduled_on_number_of(onehost_job.id, (5, 6, 7), 1)
-        self._check_for_extra_schedulings()
-
-        # No more atomic groups have hosts available, no more jobs should
-        # be scheduled.
-        self._create_job(atomic_group=1)
-        self._run_scheduler()
-        self._check_for_extra_schedulings()
-
-
-    def test_atomic_group_scheduling_obeys_acls(self):
-        # Request scheduling on a specific atomic label but be denied by ACLs.
-        self._do_query('DELETE FROM afe_acl_groups_hosts '
-                       'WHERE host_id in (8,9)')
-        job = self._create_job(metahosts=[self.label5.id], atomic_group=1)
-        self._run_scheduler()
-        self._check_for_extra_schedulings()
-
-
-    def test_atomic_group_scheduling_dependency_label_exclude(self):
-        # A dependency label that matches no hosts in the atomic group.
-        job_a = self._create_job(atomic_group=1)
-        job_a.dependency_labels.add(self.label3)
-        self._run_scheduler()
-        self._check_for_extra_schedulings()
-
-
-    def test_atomic_group_scheduling_metahost_dependency_label_exclude(self):
-        # A metahost and dependency label that excludes too many hosts.
-        job_b = self._create_job(synchronous=True, metahosts=[self.label4.id],
-                                 atomic_group=1)
-        job_b.dependency_labels.add(self.label7)
-        self._run_scheduler()
-        self._check_for_extra_schedulings()
-
-
-    def test_atomic_group_scheduling_dependency_label_match(self):
-        # A dependency label that exists on enough atomic group hosts in only
-        # one of the two atomic group labels.
-        job_c = self._create_job(synchronous=True, atomic_group=1)
-        job_c.dependency_labels.add(self.label7)
-        self._run_scheduler()
-        self._assert_job_scheduled_on_number_of(job_c.id, (8, 9), 2)
-        self._check_for_extra_schedulings()
-
-
-    def test_atomic_group_scheduling_no_metahost(self):
-        # Force it to schedule on the other group for a reliable test.
-        self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=9')
-        # An atomic job without a metahost.
-        job = self._create_job(synchronous=True, atomic_group=1)
-        self._run_scheduler()
-        self._assert_job_scheduled_on_number_of(job.id, (5, 6, 7), 2)
-        self._check_for_extra_schedulings()
-
-
-    def test_atomic_group_scheduling_partial_group(self):
-        # Make one host in labels[3] unavailable so that there are only two
-        # hosts left in the group.
-        self._do_query('UPDATE afe_hosts SET status="Repair Failed" WHERE id=5')
-        job = self._create_job(synchronous=True, metahosts=[self.label4.id],
-                         atomic_group=1)
-        self._run_scheduler()
-        # Verify that it was scheduled on the 2 ready hosts in that group.
-        self._assert_job_scheduled_on(job.id, 6)
-        self._assert_job_scheduled_on(job.id, 7)
-        self._check_for_extra_schedulings()
-
-
-    def test_atomic_group_scheduling_not_enough_available(self):
-        # Mark some hosts in each atomic group label as not usable.
-        # One host running, another invalid in the first group label.
-        self._do_query('UPDATE afe_hosts SET status="Running" WHERE id=5')
-        self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=6')
-        # One host invalid in the second group label.
-        self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=9')
-        # Nothing to schedule when no group label has enough (2) good hosts..
-        self._create_job(atomic_group=1, synchronous=True)
-        self._run_scheduler()
-        # There are not enough hosts in either atomic group,
-        # No more scheduling should occur.
-        self._check_for_extra_schedulings()
-
-        # Now create an atomic job that has a synch count of 1.  It should
-        # schedule on exactly one of the hosts.
-        onehost_job = self._create_job(atomic_group=1)
-        self._run_scheduler()
-        self._assert_job_scheduled_on_number_of(onehost_job.id, (7, 8), 1)
-
-
-    def test_atomic_group_scheduling_no_valid_hosts(self):
-        self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id in (8,9)')
-        self._create_job(synchronous=True, metahosts=[self.label5.id],
-                         atomic_group=1)
-        self._run_scheduler()
-        # no hosts in the selected group and label are valid.  no schedulings.
-        self._check_for_extra_schedulings()
-
-
-    def test_atomic_group_scheduling_metahost_works(self):
-        # Test that atomic group scheduling also obeys metahosts.
-        self._create_job(metahosts=[0], atomic_group=1)
-        self._run_scheduler()
-        # There are no atomic group hosts that also have that metahost.
-        self._check_for_extra_schedulings()
-
-        job_b = self._create_job(metahosts=[self.label5.id], atomic_group=1)
-        self._run_scheduler()
-        self._assert_job_scheduled_on(job_b.id, 8)
-        self._assert_job_scheduled_on(job_b.id, 9)
-        self._check_for_extra_schedulings()
-
-
-    def test_atomic_group_skips_ineligible_hosts(self):
-        # Test hosts marked ineligible for this job are not eligible.
-        # How would this ever happen anyways?
-        job = self._create_job(metahosts=[self.label4.id], atomic_group=1)
-        models.IneligibleHostQueue.objects.create(job=job, host_id=5)
-        models.IneligibleHostQueue.objects.create(job=job, host_id=6)
-        models.IneligibleHostQueue.objects.create(job=job, host_id=7)
-        self._run_scheduler()
-        # No scheduling should occur as all desired hosts were ineligible.
-        self._check_for_extra_schedulings()
-
-
-    def test_atomic_group_scheduling_fail(self):
-        # If synch_count is > the atomic group number of machines, the job
-        # should be aborted immediately.
-        model_job = self._create_job(synchronous=True, atomic_group=1)
-        model_job.synch_count = 4
-        model_job.save()
-        job = scheduler_models.Job(id=model_job.id)
-        self._run_scheduler()
-        self._check_for_extra_schedulings()
-        queue_entries = job.get_host_queue_entries()
-        self.assertEqual(1, len(queue_entries))
-        self.assertEqual(queue_entries[0].status,
-                         models.HostQueueEntry.Status.ABORTED)
-
-
-    def test_atomic_group_no_labels_no_scheduling(self):
-        # Never schedule on atomic groups marked invalid.
-        job = self._create_job(metahosts=[self.label5.id], synchronous=True,
-                               atomic_group=1)
-        # Deleting an atomic group via the frontend marks it invalid and
-        # removes all label references to the group.  The job now references
-        # an invalid atomic group with no labels associated with it.
-        self.label5.atomic_group.invalid = True
-        self.label5.atomic_group.save()
-        self.label5.atomic_group = None
-        self.label5.save()
-
-        self._run_scheduler()
-        self._check_for_extra_schedulings()
-
-
-    def test_schedule_directly_on_atomic_group_host_fail(self):
-        # Scheduling a job directly on hosts in an atomic group must
-        # fail to avoid users inadvertently holding up the use of an
-        # entire atomic group by using the machines individually.
-        job = self._create_job(hosts=[5])
-        self._run_scheduler()
-        self._check_for_extra_schedulings()
-
-
-    def test_schedule_directly_on_atomic_group_host(self):
-        # Scheduling a job directly on one host in an atomic group will
-        # work when the atomic group is listed on the HQE in addition
-        # to the host (assuming the sync count is 1).
-        job = self._create_job(hosts=[5], atomic_group=1)
-        self._run_scheduler()
-        self._assert_job_scheduled_on(job.id, 5)
-        self._check_for_extra_schedulings()
-
-
-    def test_schedule_directly_on_atomic_group_hosts_sync2(self):
-        job = self._create_job(hosts=[5,8], atomic_group=1, synchronous=True)
-        self._run_scheduler()
-        self._assert_job_scheduled_on(job.id, 5)
-        self._assert_job_scheduled_on(job.id, 8)
-        self._check_for_extra_schedulings()
-
-
-    def test_schedule_directly_on_atomic_group_hosts_wrong_group(self):
-        job = self._create_job(hosts=[5,8], atomic_group=2, synchronous=True)
-        self._run_scheduler()
-        self._check_for_extra_schedulings()
-
-
     def test_only_schedule_queued_entries(self):
         self._create_job(metahosts=[1])
         self._update_hqe(set='active=1, host_id=2')
diff --git a/scheduler/rdb.py b/scheduler/rdb.py
new file mode 100644
index 0000000..8dc3bcd
--- /dev/null
+++ b/scheduler/rdb.py
@@ -0,0 +1,111 @@
+"""
+Rdb server module.
+"""
+import logging
+from autotest_lib.site_utils.graphite import stats
+
+
+_timer = stats.Timer('rdb')
+
+def _check_acls(job_acls, host_acls):
+    if job_acls is None or host_acls is None:
+        return False
+    return len(host_acls.intersection(job_acls))
+
+
+def _check_deps(job_deps, host_labels):
+    if job_deps is None or host_labels is None:
+        return False
+    return len(job_deps - host_labels) == 0
+
+
+def validate_host_assignment(job_info, host_info):
+    """ Validate this job<->host pairing.
+
+    @param job_info: Information about the job as determined by
+                     the client rdb module.
+    @param host_info: Information about the host as determined by
+                      get_host_info.
+
+    @return: True if the job<->host pairing is valid, False otherwise.
+             False, if we don't have enough information to make a decision.
+    """
+    one_time_host = host_info.get('invalid') and job_info.get('host_id')
+
+    return (_check_acls(job_info.get('acls'), host_info.get('acls')) and
+            _check_deps(job_info.get('deps'), host_info.get('labels')) and
+            not host_info.get('invalid', True) or one_time_host and
+            not host_info.get('locked', True))
+
+
+def get_host_info(host_scheduler, host_id):
+    """
+    Utility method to parse information about a host into a dictionary.
+
+    Ideally this can just return the Host object, but doing this has the
+    following advantages:
+        1. Changes to the schema will only require changes to this method.
+        2. We can reimplement this method to make use of django caching.
+        3. We can lock rows of the host table in a centralized location.
+
+    @param host_id: id of the host to get information about.
+    @return: A dictionary containing all information needed to make a
+             scheduling decision regarding this host.
+    """
+    acls = host_scheduler._host_acls.get(host_id, set())
+    labels = host_scheduler._host_labels.get(host_id, set())
+    host_info = {'labels': labels, 'acls': acls}
+    host = host_scheduler._hosts_available.get(host_id)
+    if host:
+        host_info.update({'locked': host.locked, 'invalid': host.invalid})
+    return host_info
+
+
+def _order_labels(host_scheduler, labels):
+    """Given a list of labels, order them by available host count.
+
+    To make a scheduling decision, we need a host that matches all dependencies
+    of a job, hence the most restrictive search space we can use is the list
+    of ready hosts that have the least frequent label.
+
+    @param labels: A list of labels. If no hosts are available in a label,
+                   it will be the first in this list.
+    """
+    label_count = [len(host_scheduler._label_hosts.get(label, []))
+                   for label in labels]
+    return [label_tuple[1] for label_tuple in sorted(zip(label_count, labels))]
+
+
+@_timer.decorate
+def get_host(host_scheduler, job_info):
+    """
+    Get a host matching the job's selection criterion.
+
+    - Get all hosts in rarest label.
+    - Check which ones are still usable.
+    - Return the first of these hosts that passes our validity checks.
+
+    @param job_info: A dictionary of job information needed to pick a host.
+
+    @return: A host object from the available_hosts map.
+    """
+
+    # A job must at least have one dependency (eg:'board:') in order for us to
+    # find a host for it. To do so we use 2 data structures of host_scheduler:
+    # - label to hosts map: to count label frequencies, and get hosts in a label
+    # - hosts_available map: to mark a host as used, as it would be difficult
+    #   to delete this host from all the label keys it has, in the label to
+    #   hosts map.
+    rarest_label = _order_labels(host_scheduler, job_info.get('deps'))[0]
+
+    # TODO(beeps): Once we have implemented locking in afe_hosts make this:
+    # afe.models.Host.object.filter(locked).filter(acls).filter(labels)...
+    # where labels are chained according to frequency. Currently this will
+    # require a join across all hqes which could be expensive, and is
+    # unnecessary anyway since we need to move away from this scheduling model.
+    hosts_considered = host_scheduler._label_hosts.get(rarest_label, [])
+    for host_id in hosts_considered:
+        host = host_scheduler._hosts_available.get(host_id)
+        host_info = get_host_info(host_scheduler, host_id)
+        if host and validate_host_assignment(job_info, host_info):
+            return host
diff --git a/server/cros/dynamic_suite/suite.py b/server/cros/dynamic_suite/suite.py
index 3d0eb0c..6eb56a5 100644
--- a/server/cros/dynamic_suite/suite.py
+++ b/server/cros/dynamic_suite/suite.py
@@ -331,12 +331,17 @@
             job_deps = []
         else:
             job_deps = list(test.dependencies)
-
         if self._extra_deps:
             job_deps.extend(self._extra_deps)
         if self._pool:
             job_deps.append(self._pool)
 
+        # TODO(beeps): Comletely remove the concept of a metahost.
+        # Currently we use this to distinguis a job scheduled through
+        # the afe from a suite job, as only the latter will get requeued
+        # when a special task fails.
+        job_deps.append(self._board)
+
         test_obj = self._afe.create_job(
             control_file=test.text,
             name='/'.join([self._build, self._tag, test.name]),
diff --git a/server/cros/dynamic_suite/suite_unittest.py b/server/cros/dynamic_suite/suite_unittest.py
index 160e445..b7add75 100644
--- a/server/cros/dynamic_suite/suite_unittest.py
+++ b/server/cros/dynamic_suite/suite_unittest.py
@@ -233,6 +233,7 @@
                 dependencies.extend(test.dependencies)
             if suite_deps:
                 dependencies.extend(suite_deps)
+            dependencies.append(self._BOARD)
             job_mock = self.afe.create_job(
                 control_file=test.text,
                 name=mox.And(mox.StrContains(self._BUILD),
diff --git a/utils/unittest_suite.py b/utils/unittest_suite.py
index e1552bf..97f4612 100755
--- a/utils/unittest_suite.py
+++ b/utils/unittest_suite.py
@@ -89,6 +89,7 @@
     'chaos_base_test.py',
     'chaos_interop_test.py',
     'monitor_db_functional_test.py',
+    'atomic_group_unittests.py',
     # crbug.com/251395
     'dev_server_test.py',
     'full_release_test.py',