[autotest] Split host acquisition and job scheduling.

This is phase one of two in the plan to split host acquisition out of the
scheduler's tick. The idea is to have the host scheduler use a job query
manager to query the database for new jobs without hosts and assign
hosts to them, while the main scheduler uses the same query managers to
look for hostless jobs.

Currently the main scheduler uses the class to acquire hosts inline,
like it always has, and will continue to do so till the
inline_host_acquisition feature flag is turned on via the shadow_config.

TEST=Ran the scheduler, suites, unittets.
BUG=chromium:344613
DEPLOY=Scheduler

Change-Id: I542e4d1e509c16cac7354810416ee18ac940a7cf
Reviewed-on: https://chromium-review.googlesource.com/199383
Reviewed-by: Prashanth B <beeps@chromium.org>
Commit-Queue: Prashanth B <beeps@chromium.org>
Tested-by: Prashanth B <beeps@chromium.org>
diff --git a/scheduler/host_scheduler.py b/scheduler/host_scheduler.py
index 5ebdd7c..30f2569 100644
--- a/scheduler/host_scheduler.py
+++ b/scheduler/host_scheduler.py
@@ -1,56 +1,31 @@
 """Autotest host scheduler.
 """
 
-
+import collections
 import logging
 
-from autotest_lib.client.common_lib import global_config, utils
-from autotest_lib.frontend.afe import models
-from autotest_lib.scheduler import metahost_scheduler, scheduler_config
-from autotest_lib.scheduler import scheduler_models
+from autotest_lib.scheduler import query_managers
+from autotest_lib.scheduler import rdb_lib
+from autotest_lib.scheduler import rdb_utils
 from autotest_lib.site_utils.graphite import stats
-from autotest_lib.server.cros import provision
 
 
-get_site_metahost_schedulers = utils.import_site_function(
-        __file__, 'autotest_lib.scheduler.site_metahost_scheduler',
-        'get_metahost_schedulers', lambda : ())
+class BaseHostScheduler(object):
+    """Base class containing host acquisition logic.
 
-
-class BaseHostScheduler(metahost_scheduler.HostSchedulingUtility):
-    """Handles the logic for choosing when to run jobs and on which hosts.
-
-    This class makes several queries to the database on each tick, building up
-    some auxiliary data structures and using them to determine which hosts are
-    eligible to run which jobs, taking into account all the various factors that
-    affect that.
-
-    In the past this was done with one or two very large, complex database
-    queries.  It has proven much simpler and faster to build these auxiliary
-    data structures and perform the logic in Python.
+    This class contains all the core host acquisition logic needed by the
+    scheduler to run jobs on hosts. It is only capable of releasing hosts
+    back to the rdb through its tick, any other action must be instigated by
+    the job scheduler.
     """
 
 
-    _timer = stats.Timer('host_scheduler')
+    _timer = stats.Timer('base_host_scheduler')
+    host_assignment = collections.namedtuple('host_assignment', ['host', 'job'])
 
 
-    def __init__(self, db):
-        self._db = db
-        self._metahost_schedulers = metahost_scheduler.get_metahost_schedulers()
-
-        # load site-specific scheduler selected in global_config
-        site_schedulers_str = global_config.global_config.get_config_value(
-                scheduler_config.CONFIG_SECTION, 'site_metahost_schedulers',
-                default='')
-        site_schedulers = set(site_schedulers_str.split(','))
-        for scheduler in get_site_metahost_schedulers():
-            if type(scheduler).__name__ in site_schedulers:
-                # always prepend, so site schedulers take precedence
-                self._metahost_schedulers = (
-                        [scheduler] + self._metahost_schedulers)
-        logging.info('Metahost schedulers: %s',
-                     ', '.join(type(scheduler).__name__ for scheduler
-                               in self._metahost_schedulers))
+    def __init__(self):
+        self.host_query_manager = query_managers.AFEHostQueryManager()
 
 
     @_timer.decorate
@@ -60,424 +35,75 @@
         Release all hosts that are ready and are currently not being used by an
         active hqe, and don't have a new special task scheduled against them.
         """
-        # Avoid any host with a currently active queue entry against it.
-        hqe_join = ('LEFT JOIN afe_host_queue_entries AS active_hqe '
-                    'ON (afe_hosts.id = active_hqe.host_id AND '
-                    'active_hqe.active)')
-
-        # Avoid any host with a new special task against it. There are 2 cases
-        # when an inactive but incomplete special task will not use the host
-        # this tick: 1. When the host is locked 2. When an active hqe already
-        # has special tasks for the same host. In both these cases this host
-        # will not be in the ready hosts list anyway. In all other cases,
-        # an incomplete special task will grab the host before a new job does
-        # by assigning an agent to it.
-        special_task_join = ('LEFT JOIN afe_special_tasks as new_tasks '
-                             'ON (afe_hosts.id = new_tasks.host_id AND '
-                             'new_tasks.is_complete=0)')
-
-        hosts = scheduler_models.Host.fetch(
-            joins='%s %s' % (hqe_join, special_task_join),
-            where="active_hqe.host_id IS NULL AND new_tasks.host_id IS NULL "
-                  "AND afe_hosts.leased "
-                  "AND NOT afe_hosts.locked "
-                  "AND (afe_hosts.status IS NULL "
-                          "OR afe_hosts.status = 'Ready')")
-
-        for ready_host in hosts:
-            ready_host.update_field('leased', 0)
+        release_hostnames = [host.hostname for host in
+                             self.host_query_manager.find_unused_healty_hosts()]
+        if release_hostnames:
+            self.host_query_manager.set_leased(
+                    False, hostname__in=release_hostnames)
 
 
-    @_timer.decorate
-    def _get_ready_hosts(self):
-        # We don't lose anything by re-doing these checks
-        # even though we release hosts on the same conditions.
-        # In the future we might have multiple clients that
-        # release_hosts and/or lock them independent of the
-        # scheduler tick.
-        hosts = scheduler_models.Host.fetch(
-            where="NOT afe_hosts.leased "
-                  "AND NOT afe_hosts.locked "
-                  "AND (afe_hosts.status IS NULL "
-                      "OR afe_hosts.status = 'Ready')")
-        return dict((host.id, host) for host in hosts)
+    @classmethod
+    def schedule_host_job(cls, host, queue_entry):
+        """Schedule a job on a host.
 
+        Scheduling a job involves:
+            1. Setting the active bit on the queue_entry.
+            2. Scheduling a special task on behalf of the queue_entry.
+        Performing these actions will lead the job scheduler through a chain of
+        events, culminating in running the test and collecting results from
+        the host.
 
-    def _get_sql_id_list(self, id_list):
-        return ','.join(str(item_id) for item_id in id_list)
-
-
-    def _get_many2many_dict(self, query, id_list, flip=False):
-        if not id_list:
-            return {}
-        query %= self._get_sql_id_list(id_list)
-        rows = self._db.execute(query)
-        return self._process_many2many_dict(rows, flip)
-
-
-    def _process_many2many_dict(self, rows, flip=False):
-        result = {}
-        for row in rows:
-            left_id, right_id = int(row[0]), int(row[1])
-            if flip:
-                left_id, right_id = right_id, left_id
-            result.setdefault(left_id, set()).add(right_id)
-        return result
-
-
-    @_timer.decorate
-    def _get_job_acl_groups(self, job_ids):
-        query = """
-        SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id
-        FROM afe_jobs
-        INNER JOIN afe_users ON afe_users.login = afe_jobs.owner
-        INNER JOIN afe_acl_groups_users ON
-                afe_acl_groups_users.user_id = afe_users.id
-        WHERE afe_jobs.id IN (%s)
+        @param host: The host against which to schedule the job.
+        @param queue_entry: The queue_entry to schedule.
         """
-        return self._get_many2many_dict(query, job_ids)
+        if queue_entry.host_id is None:
+            queue_entry.set_host(host)
+        elif host.id != queue_entry.host_id:
+                raise rdb_utils.RDBException('The rdb returned host: %s '
+                        'but the job:%s was already assigned a host: %s. ' %
+                        (host.hostname, queue_entry.job_id,
+                         queue_entry.host.hostname))
+        queue_entry.update_field('active', True)
+
+        # TODO: crbug.com/373936. The host scheduler should only be assigning
+        # jobs to hosts, but the criterion we use to release hosts depends
+        # on it not being used by an active hqe. Since we're activating the
+        # hqe here, we also need to schedule its first prejob task. OTOH,
+        # we could converge to having the host scheduler manager all special
+        # tasks, since their only use today is to verify/cleanup/reset a host.
+        logging.info('Scheduling pre job tasks for entry: %s', queue_entry)
+        queue_entry.schedule_pre_job_tasks()
 
 
-    @_timer.decorate
-    def _get_job_ineligible_hosts(self, job_ids):
-        query = """
-        SELECT job_id, host_id
-        FROM afe_ineligible_host_queues
-        WHERE job_id IN (%s)
+    @classmethod
+    def find_hosts_for_jobs(cls, host_jobs):
+        """Find and verify hosts for a list of jobs.
+
+        @param host_jobs: A list of queue entries that either require hosts,
+            or require host assignment validation through the rdb.
+        @return: A list of tuples of the form (host, queue_entry) for each
+            valid host-queue_entry assignment.
         """
-        return self._get_many2many_dict(query, job_ids)
-
-
-    @_timer.decorate
-    def _get_job_dependencies(self, job_ids):
-        query = """
-        SELECT job_id, label_id
-        FROM afe_jobs_dependency_labels
-        WHERE job_id IN (%s)
-        """
-        return self._get_many2many_dict(query, job_ids)
-
-
-    @_timer.decorate
-    def _get_host_acls(self, host_ids):
-        query = """
-        SELECT host_id, aclgroup_id
-        FROM afe_acl_groups_hosts
-        WHERE host_id IN (%s)
-        """
-        return self._get_many2many_dict(query, host_ids)
-
-
-    @_timer.decorate
-    def _get_label_hosts(self, host_ids):
-        if not host_ids:
-            return {}, {}
-        query = """
-        SELECT label_id, host_id
-        FROM afe_hosts_labels
-        WHERE host_id IN (%s)
-        """ % self._get_sql_id_list(host_ids)
-        rows = self._db.execute(query)
-        labels_to_hosts = self._process_many2many_dict(rows)
-        hosts_to_labels = self._process_many2many_dict(rows, flip=True)
-        return labels_to_hosts, hosts_to_labels
-
-
-    @_timer.decorate
-    def _get_labels(self, job_dependencies):
-        """
-        Calculate a dict mapping label id to label object so that we don't
-        frequently round trip to the database every time we need a label.
-
-        @param job_dependencies: A dict mapping an integer job id to a list of
-            integer label id's.  ie. {job_id: [label_id]}
-        @return: A dict mapping an integer label id to a scheduler model label
-            object.  ie. {label_id: label_object}
-
-        """
-        id_to_label = dict()
-        # Pull all the labels on hosts we might look at
-        host_labels = scheduler_models.Label.fetch(
-                where="id IN (SELECT label_id FROM afe_hosts_labels)")
-        id_to_label.update([(label.id, label) for label in host_labels])
-        # and pull all the labels on jobs we might look at.
-        job_label_set = set()
-        for job_deps in job_dependencies.values():
-            job_label_set.update(job_deps)
-        # On the rare/impossible chance that no jobs have any labels, we
-        # can skip this.
-        if job_label_set:
-            job_string_label_list = ','.join([str(x) for x in job_label_set])
-            job_labels = scheduler_models.Label.fetch(
-                    where="id IN (%s)" % job_string_label_list)
-            id_to_label.update([(label.id, label) for label in job_labels])
-        return id_to_label
-
-
-    def recovery_on_startup(self):
-        for metahost_scheduler in self._metahost_schedulers:
-            metahost_scheduler.recovery_on_startup()
-
-
-    @_timer.decorate
-    def refresh(self, pending_queue_entries):
-        self._hosts_available = self._get_ready_hosts()
-
-        relevant_jobs = [queue_entry.job_id
-                         for queue_entry in pending_queue_entries]
-        self._job_acls = self._get_job_acl_groups(relevant_jobs)
-        self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
-        self._job_dependencies = self._get_job_dependencies(relevant_jobs)
-
-        host_ids = self._hosts_available.keys()
-        self._host_acls = self._get_host_acls(host_ids)
-        self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
-
-        self._labels = self._get_labels(self._job_dependencies)
+        jobs_with_hosts = []
+        hosts = rdb_lib.acquire_hosts(host_jobs)
+        for host, job in zip(hosts, host_jobs):
+            if host:
+                jobs_with_hosts.append(cls.host_assignment(host, job))
+        return jobs_with_hosts
 
 
     @_timer.decorate
     def tick(self):
+        """Schedule core host management activities."""
         self._release_hosts()
 
 
-    def hosts_in_label(self, label_id):
-        return set(self._label_hosts.get(label_id, ()))
+class DummyHostScheduler(BaseHostScheduler):
+    """A dummy host scheduler that doesn't acquire or release hosts."""
+
+    def __init__(self):
+        pass
 
 
-    def remove_host_from_label(self, host_id, label_id):
-        self._label_hosts[label_id].remove(host_id)
-
-
-    def pop_host(self, host_id):
-        return self._hosts_available.pop(host_id)
-
-
-    def ineligible_hosts_for_entry(self, queue_entry):
-        return set(self._ineligible_hosts.get(queue_entry.job_id, ()))
-
-
-    def _is_acl_accessible(self, host_id, queue_entry):
-        job_acls = self._job_acls.get(queue_entry.job_id, set())
-        host_acls = self._host_acls.get(host_id, set())
-        return len(host_acls.intersection(job_acls)) > 0
-
-
-    def _check_job_dependencies(self, job_dependencies, host_labels):
-        missing = job_dependencies - host_labels
-        return len(missing) == 0
-
-
-    def _check_only_if_needed_labels(self, job_dependencies, host_labels,
-                                     queue_entry):
-        if not queue_entry.meta_host:
-            # bypass only_if_needed labels when a specific host is selected
-            return True
-
-        for label_id in host_labels:
-            label = self._labels[label_id]
-            if not label.only_if_needed:
-                # we don't care about non-only_if_needed labels
-                continue
-            if queue_entry.meta_host == label_id:
-                # if the label was requested in a metahost it's OK
-                continue
-            if label_id not in job_dependencies:
-                return False
-        return True
-
-
-    def _check_atomic_group_labels(self, host_labels, queue_entry):
-        """
-        Determine if the given HostQueueEntry's atomic group settings are okay
-        to schedule on a host with the given labels.
-
-        @param host_labels: A list of label ids that the host has.
-        @param queue_entry: The HostQueueEntry being considered for the host.
-
-        @returns True if atomic group settings are okay, False otherwise.
-        """
-        return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
-                queue_entry.atomic_group_id)
-
-
-    def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
-        """
-        Return the atomic group label id for a host with the given set of
-        labels if any, or None otherwise.  Raises an exception if more than
-        one atomic group are found in the set of labels.
-
-        @param host_labels: A list of label ids that the host has.
-        @param queue_entry: The HostQueueEntry we're testing.  Only used for
-                extra info in a potential logged error message.
-
-        @returns The id of the atomic group found on a label in host_labels
-                or None if no atomic group label is found.
-        """
-        atomic_labels = [self._labels[label_id] for label_id in host_labels
-                         if self._labels[label_id].atomic_group_id is not None]
-        atomic_ids = set(label.atomic_group_id for label in atomic_labels)
-        if not atomic_ids:
-            return None
-        if len(atomic_ids) > 1:
-            logging.error('More than one Atomic Group on HQE "%s" via: %r',
-                          queue_entry, atomic_labels)
-        return atomic_ids.pop()
-
-
-    def _get_atomic_group_labels(self, atomic_group_id):
-        """
-        Lookup the label ids that an atomic_group is associated with.
-
-        @param atomic_group_id - The id of the AtomicGroup to look up.
-
-        @returns A generator yeilding Label ids for this atomic group.
-        """
-        return (id for id, label in self._labels.iteritems()
-                if label.atomic_group_id == atomic_group_id
-                and not label.invalid)
-
-
-    def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
-        """
-        @param group_hosts - A sequence of Host ids to test for usability
-                and eligibility against the Job associated with queue_entry.
-        @param queue_entry - The HostQueueEntry that these hosts are being
-                tested for eligibility against.
-
-        @returns A subset of group_hosts Host ids that are eligible for the
-                supplied queue_entry.
-        """
-        return set(host_id for host_id in group_hosts
-                   if self.is_host_usable(host_id)
-                   and self.is_host_eligible_for_job(host_id, queue_entry))
-
-
-    def is_host_eligible_for_job(self, host_id, queue_entry):
-        if self._is_host_invalid(host_id):
-            # if an invalid host is scheduled for a job, it's a one-time host
-            # and it therefore bypasses eligibility checks. note this can only
-            # happen for non-metahosts, because invalid hosts have their label
-            # relationships cleared.
-            return True
-
-        job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
-        # Remove provisionable labels from the set of job_dependencies that we
-        # need to satisfy
-        job_dependencies = set([dep for dep in job_dependencies if
-                not provision.is_for_special_action(self._labels[dep].name)])
-        host_labels = self._host_labels.get(host_id, set())
-
-        return (self._is_acl_accessible(host_id, queue_entry) and
-                self._check_job_dependencies(job_dependencies, host_labels) and
-                self._check_only_if_needed_labels(
-                    job_dependencies, host_labels, queue_entry) and
-                self._check_atomic_group_labels(host_labels, queue_entry))
-
-
-    def _is_host_invalid(self, host_id):
-        host_object = self._hosts_available.get(host_id, None)
-        return host_object and host_object.invalid
-
-
-    def is_host_usable(self, host_id):
-        if host_id not in self._hosts_available:
-            # host was already used during this scheduling cycle
-            return False
-        if self._hosts_available[host_id].invalid:
-            # Invalid hosts cannot be used for metahosts.  They're included in
-            # the original query because they can be used by non-metahosts.
-            return False
-        return True
-
-
-
-
-    def find_eligible_atomic_group(self, queue_entry):
-        """
-        Given an atomic group host queue entry, locate an appropriate group
-        of hosts for the associated job to run on.
-
-        The caller is responsible for creating new HQEs for the additional
-        hosts returned in order to run the actual job on them.
-
-        @returns A list of Host instances in a ready state to satisfy this
-                atomic group scheduling.  Hosts will all belong to the same
-                atomic group label as specified by the queue_entry.
-                An empty list will be returned if no suitable atomic
-                group could be found.
-
-        TODO(gps): what is responsible for kicking off any attempted repairs on
-        a group of hosts?  not this function, but something needs to.  We do
-        not communicate that reason for returning [] outside of here...
-        For now, we'll just be unschedulable if enough hosts within one group
-        enter Repair Failed state.
-        """
-        assert queue_entry.atomic_group_id is not None
-        job = queue_entry.job
-        assert job.synch_count and job.synch_count > 0
-        atomic_group = queue_entry.atomic_group
-        if job.synch_count > atomic_group.max_number_of_machines:
-            # Such a Job and HostQueueEntry should never be possible to
-            # create using the frontend.  Regardless, we can't process it.
-            # Abort it immediately and log an error on the scheduler.
-            queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
-            logging.error(
-                'Error: job %d synch_count=%d > requested atomic_group %d '
-                'max_number_of_machines=%d.  Aborted host_queue_entry %d.',
-                job.id, job.synch_count, atomic_group.id,
-                 atomic_group.max_number_of_machines, queue_entry.id)
-            return []
-        hosts_in_label = self.hosts_in_label(queue_entry.meta_host)
-        ineligible_host_ids = self.ineligible_hosts_for_entry(queue_entry)
-
-        # Look in each label associated with atomic_group until we find one with
-        # enough hosts to satisfy the job.
-        for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
-            group_hosts = set(self.hosts_in_label(atomic_label_id))
-            if queue_entry.meta_host is not None:
-                # If we have a metahost label, only allow its hosts.
-                group_hosts.intersection_update(hosts_in_label)
-            group_hosts -= ineligible_host_ids
-            eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
-                    group_hosts, queue_entry)
-
-            # Job.synch_count is treated as "minimum synch count" when
-            # scheduling for an atomic group of hosts.  The atomic group
-            # number of machines is the maximum to pick out of a single
-            # atomic group label for scheduling at one time.
-            min_hosts = job.synch_count
-            max_hosts = atomic_group.max_number_of_machines
-
-            if len(eligible_host_ids_in_group) < min_hosts:
-                # Not enough eligible hosts in this atomic group label.
-                continue
-
-            eligible_hosts_in_group = [self._hosts_available[id]
-                                       for id in eligible_host_ids_in_group]
-            # So that they show up in a sane order when viewing the job.
-            eligible_hosts_in_group.sort(cmp=scheduler_models.Host.cmp_for_sort)
-
-            # Limit ourselves to scheduling the atomic group size.
-            if len(eligible_hosts_in_group) > max_hosts:
-                eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
-
-            # Remove the selected hosts from our cached internal state
-            # of available hosts in order to return the Host objects.
-            host_list = []
-            for host in eligible_hosts_in_group:
-                hosts_in_label.discard(host.id)
-                self._hosts_available.pop(host.id)
-                host_list.append(host)
-            return host_list
-
-        return []
-
-
-site_host_scheduler = utils.import_site_class(
-        __file__, 'autotest_lib.scheduler.site_host_scheduler',
-        'site_host_scheduler', BaseHostScheduler)
-
-
-class HostScheduler(site_host_scheduler):
-    pass
+    def tick(self):
+        pass
diff --git a/scheduler/host_scheduler_unittests.py b/scheduler/host_scheduler_unittests.py
new file mode 100644
index 0000000..3c7637b
--- /dev/null
+++ b/scheduler/host_scheduler_unittests.py
@@ -0,0 +1,150 @@
+#!/usr/bin/python
+#pylint: disable-msg=C0111
+
+# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import mock
+
+import common
+
+from autotest_lib.client.common_lib.test_utils import unittest
+from autotest_lib.frontend import setup_django_environment
+from autotest_lib.frontend.afe import frontend_test_utils
+from autotest_lib.frontend.afe import models
+from autotest_lib.scheduler import rdb
+from autotest_lib.scheduler import rdb_lib
+from autotest_lib.scheduler import rdb_testing_utils
+
+
+class QueryManagerTests(rdb_testing_utils.AbstractBaseRDBTester,
+                        unittest.TestCase):
+    """Verify scheduler behavior when pending jobs are already given hosts."""
+
+    _config_section = 'AUTOTEST_WEB'
+
+
+    def testPendingQueueEntries(self):
+        """Test retrieval of pending queue entries."""
+        job = self.create_job(deps=set(['a']))
+
+        # Check that we don't pull the job we just created with only_hostless.
+        jobs_with_hosts = self.job_query_manager.get_pending_queue_entries(
+                only_hostless=True)
+        self.assertTrue(len(jobs_with_hosts) == 0)
+
+        # Check that only_hostless=False pulls new jobs, as always.
+        jobs_without_hosts = self.job_query_manager.get_pending_queue_entries(
+                only_hostless=False)
+        self.assertTrue(jobs_without_hosts[0].id == job.id and
+                        jobs_without_hosts[0].host_id is None)
+
+
+    def testHostQueries(self):
+        """Verify that the host query manager maintains its data structures."""
+        # Create a job and use the host_query_managers internal datastructures
+        # to retrieve its job info.
+        job = self.create_job(
+                deps=rdb_testing_utils.DEFAULT_DEPS,
+                acls=rdb_testing_utils.DEFAULT_ACLS)
+        queue_entries = self._dispatcher._refresh_pending_queue_entries()
+        job_manager = rdb_lib.JobQueryManager(queue_entries)
+        job_info = job_manager.get_job_info(queue_entries[0])
+        default_dep_ids = set([label.id for label in self.db_helper.get_labels(
+                name__in=rdb_testing_utils.DEFAULT_DEPS)])
+        default_acl_ids = set([acl.id for acl in self.db_helper.get_acls(
+                name__in=rdb_testing_utils.DEFAULT_ACLS)])
+        self.assertTrue(set(job_info['deps']) == default_dep_ids)
+        self.assertTrue(set(job_info['acls']) == default_acl_ids)
+
+
+    def testNewJobsWithHosts(self):
+        """Test that we handle inactive hqes with unleased hosts correctly."""
+        # Create a job and assign it an unleased host, then check that the
+        # HQE becomes active and the host remains assigned to it.
+        job = self.create_job(deps=['a'])
+        host = self.db_helper.create_host('h1', deps=['a'])
+        self.db_helper.add_host_to_job(host, job.id)
+
+        queue_entries = self._dispatcher._refresh_pending_queue_entries()
+        self._dispatcher._schedule_new_jobs()
+
+        host = self.db_helper.get_host(hostname='h1')[0]
+        self.assertTrue(host.leased == True and
+                        host.status == models.Host.Status.READY)
+        hqes = list(self.db_helper.get_hqes(host_id=host.id))
+        self.assertTrue(len(hqes) == 1 and hqes[0].active and
+                        hqes[0].status == models.HostQueueEntry.Status.QUEUED)
+
+
+    def testNewJobsWithInvalidHost(self):
+        """Test handling of inactive hqes assigned invalid, unleased hosts."""
+        # Create a job and assign it an unleased host, then check that the
+        # HQE becomes DOES NOT become active, because we validate the
+        # assignment again.
+        job = self.create_job(deps=['a'])
+        host = self.db_helper.create_host('h1', deps=['b'])
+        self.db_helper.add_host_to_job(host, job.id)
+
+        queue_entries = self._dispatcher._refresh_pending_queue_entries()
+        self._dispatcher._schedule_new_jobs()
+
+        host = self.db_helper.get_host(hostname='h1')[0]
+        self.assertTrue(host.leased == False and
+                        host.status == models.Host.Status.READY)
+        hqes = list(self.db_helper.get_hqes(host_id=host.id))
+        self.assertTrue(len(hqes) == 1 and not hqes[0].active and
+                        hqes[0].status == models.HostQueueEntry.Status.QUEUED)
+
+
+    def testNewJobsWithLeasedHost(self):
+        """Test handling of inactive hqes assigned leased hosts."""
+        # Create a job and assign it a leased host, then check that the
+        # HQE does not become active through the scheduler, and that the
+        # host gets released.
+        job = self.create_job(deps=['a'])
+        host = self.db_helper.create_host('h1', deps=['b'])
+        self.db_helper.add_host_to_job(host, job.id)
+        host.leased = 1
+        host.save()
+
+        rdb.batch_acquire_hosts = mock.MagicMock()
+        queue_entries = self._dispatcher._refresh_pending_queue_entries()
+        self._dispatcher._schedule_new_jobs()
+        self.assertTrue(rdb.batch_acquire_hosts.call_count == 0)
+        host = self.db_helper.get_host(hostname='h1')[0]
+        self.assertTrue(host.leased == True and
+                        host.status == models.Host.Status.READY)
+        hqes = list(self.db_helper.get_hqes(host_id=host.id))
+        self.assertTrue(len(hqes) == 1 and not hqes[0].active and
+                        hqes[0].status == models.HostQueueEntry.Status.QUEUED)
+        self.host_scheduler._release_hosts()
+        self.assertTrue(self.db_helper.get_host(hostname='h1')[0].leased == 0)
+
+
+    def testSpecialTaskOrdering(self):
+        """Test priority ordering of special tasks."""
+
+        # Create 2 special tasks, one with and one without an hqe.
+        # Then assign the same host to another active hqe and make
+        # sure we don't try scheduling either of these special tasks.
+        host = self.db_helper.create_host('h1', deps=['a'])
+        task1 = self.db_helper.create_special_task(host_id=host.id)
+        job = self.create_job(deps=['a'])
+        self.db_helper.add_host_to_job(host, job.id)
+        hqe = self.db_helper.get_hqes(job=job.id)[0]
+        task2 = self.db_helper.create_special_task(job.id)
+        tasks = self.job_query_manager.get_prioritized_special_tasks()
+        self.assertTrue(tasks[0].queue_entry_id is None and
+                        tasks[1].queue_entry_id == hqe.id)
+
+        job2 = self.create_job(deps=['a'])
+        self.db_helper.add_host_to_job(host, job2.id)
+        hqe2 = self.db_helper.get_hqes(job=job2.id)[0]
+        hqe2.status = models.HostQueueEntry.Status.RUNNING
+        hqe2.save()
+        tasks = self.job_query_manager.get_prioritized_special_tasks()
+        self.assertTrue(tasks == [])
+
+
diff --git a/scheduler/metahost_scheduler.py b/scheduler/metahost_scheduler.py
deleted file mode 100644
index 98f49be..0000000
--- a/scheduler/metahost_scheduler.py
+++ /dev/null
@@ -1,108 +0,0 @@
-from autotest_lib.client.common_lib import utils
-
-class HostSchedulingUtility(object):
-    """Interface to host availability information from the scheduler."""
-    def hosts_in_label(self, label_id):
-        """Return potentially usable hosts with the given label."""
-        raise NotImplementedError
-
-
-    def remove_host_from_label(self, host_id, label_id):
-        """Remove this host from the internal list of usable hosts in the label.
-
-        This is provided as an optimization -- when code gets a host from a
-        label and concludes it's unusable, it can call this to avoid getting
-        that host again in the future (within this tick).  This function should
-        not affect correctness.
-        """
-        raise NotImplementedError
-
-
-    def pop_host(self, host_id):
-        """Remove and return a host from the internal list of available hosts.
-        """
-        raise NotImplementedError
-
-
-    def ineligible_hosts_for_entry(self, queue_entry):
-        """Get the list of hosts ineligible to run the given queue entry."""
-        raise NotImplementedError
-
-
-    def is_host_usable(self, host_id):
-        """Determine if the host is currently usable at all."""
-        raise NotImplementedError
-
-
-    def is_host_eligible_for_job(self, host_id, queue_entry):
-        """Determine if the host is eligible specifically for this queue entry.
-
-        @param queue_entry: a HostQueueEntry DBObject
-        """
-        raise NotImplementedError
-
-
-class MetahostScheduler(object):
-    def can_schedule_metahost(self, queue_entry):
-        """Return true if this object can schedule the given queue entry.
-
-        At most one MetahostScheduler should return true for any given entry.
-
-        @param queue_entry: a HostQueueEntry DBObject
-        """
-        raise NotImplementedError
-
-
-    def schedule_metahost(self, queue_entry, scheduling_utility):
-        """Schedule the given queue entry, if possible.
-
-        This method should make necessary database changes culminating in
-        assigning a host to the given queue entry in the database.  It may
-        take no action if no host can be assigned currently.
-
-        @param queue_entry: a HostQueueEntry DBObject
-        @param scheduling_utility: a HostSchedulingUtility object
-        """
-        raise NotImplementedError
-
-
-    def recovery_on_startup(self):
-        """Perform any necessary recovery upon scheduler startup."""
-        pass
-
-
-    def tick(self):
-        """Called once per scheduler cycle; any actions are allowed."""
-        pass
-
-
-class LabelMetahostScheduler(MetahostScheduler):
-    def can_schedule_metahost(self, queue_entry):
-        return bool(queue_entry.meta_host)
-
-
-    def schedule_metahost(self, queue_entry, scheduling_utility):
-        label_id = queue_entry.meta_host
-        hosts_in_label = scheduling_utility.hosts_in_label(label_id)
-        ineligible_host_ids = scheduling_utility.ineligible_hosts_for_entry(
-                queue_entry)
-
-        for host_id in hosts_in_label:
-            if not scheduling_utility.is_host_usable(host_id):
-                scheduling_utility.remove_host_from_label(host_id, label_id)
-                continue
-            if host_id in ineligible_host_ids:
-                continue
-            if not scheduling_utility.is_host_eligible_for_job(host_id,
-                                                               queue_entry):
-                continue
-
-            # Remove the host from our cached internal state before returning
-            scheduling_utility.remove_host_from_label(host_id, label_id)
-            host = scheduling_utility.pop_host(host_id)
-            queue_entry.set_host(host)
-            return
-
-
-def get_metahost_schedulers():
-    return [LabelMetahostScheduler()]
diff --git a/scheduler/metahost_scheduler_unittest.py b/scheduler/metahost_scheduler_unittest.py
deleted file mode 100755
index c2ec728..0000000
--- a/scheduler/metahost_scheduler_unittest.py
+++ /dev/null
@@ -1,81 +0,0 @@
-#!/usr/bin/python
-
-import common
-import unittest
-from autotest_lib.client.common_lib.test_utils import mock
-from autotest_lib.frontend import setup_django_environment
-from autotest_lib.frontend import setup_test_environment
-from autotest_lib.scheduler import metahost_scheduler, scheduler_models
-
-class LabelMetahostSchedulerTest(unittest.TestCase):
-    def setUp(self):
-        self.god = mock.mock_god()
-        self.scheduling_utility = self.god.create_mock_class(
-                metahost_scheduler.HostSchedulingUtility, 'utility')
-        self.metahost_scheduler = metahost_scheduler.LabelMetahostScheduler()
-
-
-    def tearDown(self):
-        self.god.unstub_all()
-
-
-    def entry(self):
-        return self.god.create_mock_class(scheduler_models.HostQueueEntry,
-                                          'entry')
-
-
-    def test_can_schedule_metahost(self):
-        entry = self.entry()
-        entry.meta_host = None
-        self.assertFalse(self.metahost_scheduler.can_schedule_metahost(entry))
-
-        entry.meta_host = 1
-        self.assert_(self.metahost_scheduler.can_schedule_metahost(entry))
-
-
-    def test_schedule_metahost(self):
-        entry = self.entry()
-        entry.meta_host = 1
-        host = object()
-
-        self.scheduling_utility.hosts_in_label.expect_call(1).and_return(
-                [2, 3, 4, 5])
-        # 2 is in ineligible_hosts
-        (self.scheduling_utility.ineligible_hosts_for_entry.expect_call(entry)
-         .and_return([2]))
-        self.scheduling_utility.is_host_usable.expect_call(2).and_return(True)
-        # 3 is unusable
-        self.scheduling_utility.is_host_usable.expect_call(3).and_return(False)
-        self.scheduling_utility.remove_host_from_label.expect_call(3, 1)
-        # 4 is ineligible for the job
-        self.scheduling_utility.is_host_usable.expect_call(4).and_return(True)
-        (self.scheduling_utility.is_host_eligible_for_job.expect_call(4, entry)
-         .and_return(False))
-        # 5 runs
-        self.scheduling_utility.is_host_usable.expect_call(5).and_return(True)
-        (self.scheduling_utility.is_host_eligible_for_job.expect_call(5, entry)
-         .and_return(True))
-        self.scheduling_utility.remove_host_from_label.expect_call(5, 1)
-        self.scheduling_utility.pop_host.expect_call(5).and_return(host)
-        entry.set_host.expect_call(host)
-
-        self.metahost_scheduler.schedule_metahost(entry,
-                                                  self.scheduling_utility)
-        self.god.check_playback()
-
-
-    def test_no_hosts(self):
-        entry = self.entry()
-        entry.meta_host = 1
-
-        self.scheduling_utility.hosts_in_label.expect_call(1).and_return(())
-        (self.scheduling_utility.ineligible_hosts_for_entry.expect_call(entry)
-         .and_return(()))
-
-        self.metahost_scheduler.schedule_metahost(entry,
-                                                  self.scheduling_utility)
-        self.god.check_playback()
-
-
-if __name__ == '__main__':
-    unittest.main()
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index 31efe83..4afd8c5 100755
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -21,11 +21,11 @@
 from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
 from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
 from autotest_lib.scheduler import postjob_task
-from autotest_lib.scheduler import rdb_lib
-from autotest_lib.scheduler import rdb_utils
+from autotest_lib.scheduler import query_managers
 from autotest_lib.scheduler import scheduler_lib
 from autotest_lib.scheduler import scheduler_models
 from autotest_lib.scheduler import status_server, scheduler_config
+from autotest_lib.scheduler import scheduler_lib
 from autotest_lib.server import autoserv_utils
 from autotest_lib.site_utils.graphite import stats
 
@@ -33,7 +33,6 @@
 PID_FILE_PREFIX = 'monitor_db'
 
 RESULTS_DIR = '.'
-DB_CONFIG_SECTION = 'AUTOTEST_WEB'
 AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
 
 if os.environ.has_key('AUTOTEST_DIR'):
@@ -59,6 +58,10 @@
 _autoserv_path = autoserv_utils.autoserv_path
 _testing_mode = False
 _drone_manager = None
+_inline_host_acquisition = global_config.global_config.get_config_value(
+        scheduler_config.CONFIG_SECTION, 'inline_host_acquisition', type=bool,
+        default=True)
+
 
 def _parser_path_default(install_dir):
     return os.path.join(install_dir, 'tko', 'parse')
@@ -183,12 +186,12 @@
 
     if _testing_mode:
         global_config.global_config.override_config_value(
-            DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
+            scheduler_lib.DB_CONFIG_SECTION, 'database',
+            'stresstest_autotest_web')
 
     os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
     global _db_manager
     _db_manager = scheduler_lib.ConnectionManager()
-
     logging.info("Setting signal handler")
     signal.signal(signal.SIGINT, handle_sigint)
 
@@ -235,8 +238,6 @@
     def __init__(self):
         self._agents = []
         self._last_clean_time = time.time()
-        self._host_scheduler = host_scheduler.HostScheduler(
-                _db_manager.get_connection())
         user_cleanup_time = scheduler_config.config.clean_interval
         self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
                 _db_manager.get_connection(), user_cleanup_time)
@@ -257,6 +258,15 @@
                 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
                 default=False)
 
+        # If _inline_host_acquisition is set the scheduler will acquire and
+        # release hosts against jobs inline, with the tick. Otherwise the
+        # scheduler will only focus on jobs that already have hosts, and
+        # will not explicitly unlease a host when a job finishes using it.
+        self._job_query_manager = query_managers.AFEJobQueryManager()
+        self._host_scheduler = (host_scheduler.BaseHostScheduler()
+                                if _inline_host_acquisition else
+                                host_scheduler.DummyHostScheduler())
+
 
     def initialize(self, recover_hosts=True):
         self._periodic_cleanup.initialize()
@@ -268,8 +278,6 @@
         if recover_hosts:
             self._recover_hosts()
 
-        self._host_scheduler.recovery_on_startup()
-
 
     def _log_tick_msg(self, msg):
         if self._tick_debug:
@@ -623,38 +631,6 @@
                     (len(unrecovered_hqes), message))
 
 
-    def _get_prioritized_special_tasks(self):
-        """
-        Returns all queued SpecialTasks prioritized for repair first, then
-        cleanup, then verify.
-
-        @return: list of afe.models.SpecialTasks sorted according to priority.
-        """
-        queued_tasks = models.SpecialTask.objects.filter(is_active=False,
-                                                         is_complete=False,
-                                                         host__locked=False)
-        # exclude hosts with active queue entries unless the SpecialTask is for
-        # that queue entry
-        queued_tasks = models.SpecialTask.objects.add_join(
-                queued_tasks, 'afe_host_queue_entries', 'host_id',
-                join_condition='afe_host_queue_entries.active',
-                join_from_key='host_id', force_left_join=True)
-        queued_tasks = queued_tasks.extra(
-                where=['(afe_host_queue_entries.id IS NULL OR '
-                       'afe_host_queue_entries.id = '
-                               'afe_special_tasks.queue_entry_id)'])
-
-        # reorder tasks by priority
-        task_priority_order = [models.SpecialTask.Task.REPAIR,
-                               models.SpecialTask.Task.CLEANUP,
-                               models.SpecialTask.Task.VERIFY,
-                               models.SpecialTask.Task.RESET,
-                               models.SpecialTask.Task.PROVISION]
-        def task_priority_key(task):
-            return task_priority_order.index(task.task)
-        return sorted(queued_tasks, key=task_priority_key)
-
-
     def _schedule_special_tasks(self):
         """
         Execute queued SpecialTasks that are ready to run on idle hosts.
@@ -665,7 +641,7 @@
         adds them to the dispatchers agents list, so _handle_agents can execute
         them.
         """
-        for task in self._get_prioritized_special_tasks():
+        for task in self._job_query_manager.get_prioritized_special_tasks():
             if self.host_has_agent(task.host):
                 continue
             self.add_agent_task(self._get_agent_task_for_special_task(task))
@@ -705,60 +681,6 @@
                                    print_message=message)
 
 
-
-    def _get_pending_queue_entries(self):
-        """
-        Fetch a list of new host queue entries.
-
-        The ordering of this list is important, as every new agent
-        we schedule can potentially contribute to the process count
-        on the drone, which has a static limit. The sort order
-        prioritizes jobs as follows:
-        1. High priority jobs: Based on the afe_job's priority
-        2. With hosts and metahosts: This will only happen if we don't
-            activate the hqe after assigning a host to it in
-            schedule_new_jobs.
-        3. With hosts but without metahosts: When tests are scheduled
-            through the frontend the owner of the job would have chosen
-            a host for it.
-        4. Without hosts but with metahosts: This is the common case of
-            a new test that needs a DUT. We assign a host and set it to
-            active so it shouldn't show up in case 2 on the next tick.
-        5. Without hosts and without metahosts: Hostless suite jobs, that
-            will result in new jobs that fall under category 4.
-
-        A note about the ordering of cases 3 and 4:
-        Prioritizing one case above the other leads to earlier acquisition
-        of the following resources: 1. process slots on the drone 2. machines.
-        - When a user schedules a job through the afe they choose a specific
-          host for it. Jobs with metahost can utilize any host that satisfies
-          the metahost criterion. This means that if we had scheduled 4 before
-          3 there is a good chance that a job which could've used another host,
-          will now use the host assigned to a metahost-less job. Given the
-          availability of machines in pool:suites, this almost guarantees
-          starvation for jobs scheduled through the frontend.
-        - Scheduling 4 before 3 also has its pros however, since a suite
-          has the concept of a time out, whereas users can wait. If we hit the
-          process count on the drone a suite can timeout waiting on the test,
-          but a user job generally has a much longer timeout, and relatively
-          harmless consequences.
-        The current ordering was chosed because it is more likely that we will
-        run out of machines in pool:suites than processes on the drone.
-
-        @returns A list of HQEs ordered according to sort_order.
-        """
-        sort_order = ('afe_jobs.priority DESC, '
-                      'ISNULL(host_id), '
-                      'ISNULL(meta_host), '
-                      'parent_job_id, '
-                      'job_id')
-        query=('NOT complete AND NOT active AND status="Queued"'
-               'AND NOT aborted')
-        return list(scheduler_models.HostQueueEntry.fetch(
-            joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
-            where=query, order_by=sort_order))
-
-
     def _refresh_pending_queue_entries(self):
         """
         Lookup the pending HostQueueEntries and call our HostScheduler
@@ -766,47 +688,13 @@
 
         @returns A list of pending HostQueueEntries sorted in priority order.
         """
-        queue_entries = self._get_pending_queue_entries()
+        queue_entries = self._job_query_manager.get_pending_queue_entries(
+                only_hostless=not _inline_host_acquisition)
         if not queue_entries:
             return []
-
-        self._host_scheduler.refresh(queue_entries)
-
         return queue_entries
 
 
-    def _schedule_atomic_group(self, queue_entry):
-        """
-        Schedule the given queue_entry on an atomic group of hosts.
-
-        Returns immediately if there are insufficient available hosts.
-
-        Creates new HostQueueEntries based off of queue_entry for the
-        scheduled hosts and starts them all running.
-        """
-        # This is a virtual host queue entry representing an entire
-        # atomic group, find a group and schedule their hosts.
-        group_hosts = self._host_scheduler.find_eligible_atomic_group(
-                queue_entry)
-        if not group_hosts:
-            return
-
-        logging.info('Expanding atomic group entry %s with hosts %s',
-                     queue_entry,
-                     ', '.join(host.hostname for host in group_hosts))
-
-        for assigned_host in group_hosts[1:]:
-            # Create a new HQE for every additional assigned_host.
-            new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
-            new_hqe.save()
-            new_hqe.set_host(assigned_host)
-            self._run_queue_entry(new_hqe)
-
-        # The first assigned host uses the original HostQueueEntry
-        queue_entry.set_host(group_hosts[0])
-        self._run_queue_entry(queue_entry)
-
-
     def _schedule_hostless_job(self, queue_entry):
         """Schedule a hostless (suite) job.
 
@@ -836,16 +724,7 @@
                         host_agent_task.queue_entry))
             email_manager.manager.enqueue_notify_email(subject, message)
         else:
-            if queue_entry.host_id is None:
-                queue_entry.set_host(host)
-            else:
-                if host.id != queue_entry.host_id:
-                    raise rdb_utils.RDBException('The rdb returned host: %s '
-                            'but the job:%s was already assigned a host: %s. ' %
-                            (host.hostname, queue_entry.job_id,
-                            queue_entry.host.hostname))
-            queue_entry.update_field('active', True)
-            self._run_queue_entry(queue_entry)
+            self._host_scheduler.schedule_host_job(host, queue_entry)
 
 
     def _schedule_new_jobs(self):
@@ -878,12 +757,17 @@
         stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)
         if not host_jobs:
             return
-
-        hosts = rdb_lib.acquire_hosts(self._host_scheduler, host_jobs)
-        for host, queue_entry in zip(hosts, host_jobs):
-            if host:
-                self._schedule_host_job(host, queue_entry)
-                new_jobs_with_hosts = new_jobs_with_hosts + 1
+        if not _inline_host_acquisition:
+            message = ('Found %s jobs that need hosts though '
+                       '_inline_host_acquisition=%s. Will acquire hosts.' %
+                       ([str(job) for job in host_jobs],
+                        _inline_host_acquisition))
+            email_manager.manager.enqueue_notify_email(
+                    'Processing unexpected host acquisition requests', message)
+        jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)
+        for host_assignment in jobs_with_hosts:
+            self._schedule_host_job(host_assignment.host, host_assignment.job)
+            new_jobs_with_hosts = new_jobs_with_hosts + 1
 
         stats.Gauge(key).send('new_jobs_with_hosts', new_jobs_with_hosts)
         stats.Gauge(key).send('new_jobs_without_hosts',
@@ -917,12 +801,6 @@
                 self.add_agent_task(task)
 
 
-    def _run_queue_entry(self, queue_entry):
-        self._log_extra_msg('Scheduling pre job tasks for queue_entry: %s' %
-                            queue_entry)
-        queue_entry.schedule_pre_job_tasks()
-
-
     def _find_aborting(self):
         """
         Looks through the afe_host_queue_entries for an aborted entry.
diff --git a/scheduler/monitor_db_unittest.py b/scheduler/monitor_db_unittest.py
index 62bf6b4..023db12 100755
--- a/scheduler/monitor_db_unittest.py
+++ b/scheduler/monitor_db_unittest.py
@@ -105,6 +105,10 @@
         connection_manager = scheduler_lib.ConnectionManager(autocommit=False)
         self.god.stub_with(connection_manager, 'db_connection', self._database)
         self.god.stub_with(monitor_db, '_db_manager', connection_manager)
+
+        # These tests only make sense if hosts are acquired inline with the
+        # rest of the tick.
+        self.god.stub_with(monitor_db, '_inline_host_acquisition', True)
         self.god.stub_with(monitor_db.BaseDispatcher,
                            '_get_pending_queue_entries',
                            self._get_pending_hqes)
diff --git a/scheduler/query_managers.py b/scheduler/query_managers.py
new file mode 100644
index 0000000..d6325c7
--- /dev/null
+++ b/scheduler/query_managers.py
@@ -0,0 +1,344 @@
+#pylint: disable-msg=C0111
+
+# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Scheduler library classes.
+"""
+
+import collections
+import logging
+
+import common
+
+from autotest_lib.frontend import setup_django_environment
+from autotest_lib.frontend.afe import models
+from autotest_lib.scheduler import scheduler_models
+from autotest_lib.site_utils.graphite import stats
+from autotest_lib.scheduler import scheduler_lib
+
+
+class AFEJobQueryManager(object):
+    """Query manager for AFE Jobs."""
+
+    # A subquery to only get inactive hostless jobs.
+    hostless_query = 'host_id IS NULL AND meta_host IS NULL'
+
+
+    def get_pending_queue_entries(self, only_hostless=False):
+        """
+        Fetch a list of new host queue entries.
+
+        The ordering of this list is important, as every new agent
+        we schedule can potentially contribute to the process count
+        on the drone, which has a static limit. The sort order
+        prioritizes jobs as follows:
+        1. High priority jobs: Based on the afe_job's priority
+        2. With hosts and metahosts: This will only happen if we don't
+            activate the hqe after assigning a host to it in
+            schedule_new_jobs.
+        3. With hosts but without metahosts: When tests are scheduled
+            through the frontend the owner of the job would have chosen
+            a host for it.
+        4. Without hosts but with metahosts: This is the common case of
+            a new test that needs a DUT. We assign a host and set it to
+            active so it shouldn't show up in case 2 on the next tick.
+        5. Without hosts and without metahosts: Hostless suite jobs, that
+            will result in new jobs that fall under category 4.
+
+        A note about the ordering of cases 3 and 4:
+        Prioritizing one case above the other leads to earlier acquisition
+        of the following resources: 1. process slots on the drone 2. machines.
+        - When a user schedules a job through the afe they choose a specific
+          host for it. Jobs with metahost can utilize any host that satisfies
+          the metahost criterion. This means that if we had scheduled 4 before
+          3 there is a good chance that a job which could've used another host,
+          will now use the host assigned to a metahost-less job. Given the
+          availability of machines in pool:suites, this almost guarantees
+          starvation for jobs scheduled through the frontend.
+        - Scheduling 4 before 3 also has its pros however, since a suite
+          has the concept of a time out, whereas users can wait. If we hit the
+          process count on the drone a suite can timeout waiting on the test,
+          but a user job generally has a much longer timeout, and relatively
+          harmless consequences.
+        The current ordering was chosed because it is more likely that we will
+        run out of machines in pool:suites than processes on the drone.
+
+        @returns A list of HQEs ordered according to sort_order.
+        """
+        sort_order = ('afe_jobs.priority DESC, '
+                      'ISNULL(host_id), '
+                      'ISNULL(meta_host), '
+                      'parent_job_id, '
+                      'job_id')
+        query=('NOT complete AND NOT active AND status="Queued"'
+               'AND NOT aborted')
+        if only_hostless:
+            query = '%s AND (%s)' % (query, self.hostless_query)
+        return list(scheduler_models.HostQueueEntry.fetch(
+            joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
+            where=query, order_by=sort_order))
+
+
+    def get_prioritized_special_tasks(self):
+        """
+        Returns all queued SpecialTasks prioritized for repair first, then
+        cleanup, then verify.
+
+        @return: list of afe.models.SpecialTasks sorted according to priority.
+        """
+        queued_tasks = models.SpecialTask.objects.filter(is_active=False,
+                                                         is_complete=False,
+                                                         host__locked=False)
+        # exclude hosts with active queue entries unless the SpecialTask is for
+        # that queue entry
+        queued_tasks = models.SpecialTask.objects.add_join(
+                queued_tasks, 'afe_host_queue_entries', 'host_id',
+                join_condition='afe_host_queue_entries.active',
+                join_from_key='host_id', force_left_join=True)
+        queued_tasks = queued_tasks.extra(
+                where=['(afe_host_queue_entries.id IS NULL OR '
+                       'afe_host_queue_entries.id = '
+                               'afe_special_tasks.queue_entry_id)'])
+
+        # reorder tasks by priority
+        task_priority_order = [models.SpecialTask.Task.REPAIR,
+                               models.SpecialTask.Task.CLEANUP,
+                               models.SpecialTask.Task.VERIFY,
+                               models.SpecialTask.Task.RESET,
+                               models.SpecialTask.Task.PROVISION]
+        def task_priority_key(task):
+            return task_priority_order.index(task.task)
+        return sorted(queued_tasks, key=task_priority_key)
+
+
+    @classmethod
+    def get_overlapping_jobs(cls):
+        """A helper method to get all active jobs using the same host.
+
+        @return: A list of dictionaries with the hqe id, job_id and host_id
+            of the currently overlapping jobs.
+        """
+        # Filter all active hqes and stand alone special tasks to make sure
+        # a host isn't being used by two jobs at the same time. An incomplete
+        # stand alone special task can share a host with an active hqe, an
+        # example of this is the cleanup scheduled in gathering.
+        hqe_hosts = list(models.HostQueueEntry.objects.filter(
+                active=1, complete=0, host_id__isnull=False).values_list(
+                'host_id', flat=True))
+        special_task_hosts = list(models.SpecialTask.objects.filter(
+            is_active=1, is_complete=0, host_id__isnull=False,
+            queue_entry_id__isnull=True).values_list('host_id', flat=True))
+        host_counts = collections.Counter(
+                hqe_hosts + special_task_hosts).most_common()
+        multiple_hosts = [count[0] for count in host_counts if count[1] > 1]
+        return list(models.HostQueueEntry.objects.filter(
+                host_id__in=multiple_hosts, active=True).values(
+                        'id', 'job_id', 'host_id'))
+
+
+_timer = stats.Timer('scheduler.host_query_manager')
+class AFEHostQueryManager(object):
+    """Query manager for AFE Hosts."""
+
+    def __init__(self):
+        """Create an AFEHostQueryManager.
+
+        @param db: A connection to the database with the afe_hosts table.
+        """
+        self._db = scheduler_lib.ConnectionManager().get_connection()
+
+
+    def _process_many2many_dict(self, rows, flip=False):
+        result = {}
+        for row in rows:
+            left_id, right_id = int(row[0]), int(row[1])
+            if flip:
+                left_id, right_id = right_id, left_id
+            result.setdefault(left_id, set()).add(right_id)
+        return result
+
+
+    def _get_sql_id_list(self, id_list):
+        return ','.join(str(item_id) for item_id in id_list)
+
+
+    def _get_many2many_dict(self, query, id_list, flip=False):
+        if not id_list:
+            return {}
+        query %= self._get_sql_id_list(id_list)
+        rows = self._db.execute(query)
+        return self._process_many2many_dict(rows, flip)
+
+
+    @_timer.decorate
+    def _get_ready_hosts(self):
+        # We don't lose anything by re-doing these checks
+        # even though we release hosts on the same conditions.
+        # In the future we might have multiple clients that
+        # release_hosts and/or lock them independent of the
+        # scheduler tick.
+        hosts = scheduler_models.Host.fetch(
+            where="NOT afe_hosts.leased "
+                  "AND NOT afe_hosts.locked "
+                  "AND (afe_hosts.status IS NULL "
+                      "OR afe_hosts.status = 'Ready')")
+        return dict((host.id, host) for host in hosts)
+
+
+    @_timer.decorate
+    def _get_job_acl_groups(self, job_ids):
+        query = """
+        SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id
+        FROM afe_jobs
+        INNER JOIN afe_users ON afe_users.login = afe_jobs.owner
+        INNER JOIN afe_acl_groups_users ON
+                afe_acl_groups_users.user_id = afe_users.id
+        WHERE afe_jobs.id IN (%s)
+        """
+        return self._get_many2many_dict(query, job_ids)
+
+
+    @_timer.decorate
+    def _get_job_ineligible_hosts(self, job_ids):
+        query = """
+        SELECT job_id, host_id
+        FROM afe_ineligible_host_queues
+        WHERE job_id IN (%s)
+        """
+        return self._get_many2many_dict(query, job_ids)
+
+
+    @_timer.decorate
+    def _get_job_dependencies(self, job_ids):
+        query = """
+        SELECT job_id, label_id
+        FROM afe_jobs_dependency_labels
+        WHERE job_id IN (%s)
+        """
+        return self._get_many2many_dict(query, job_ids)
+
+    @_timer.decorate
+    def _get_host_acls(self, host_ids):
+        query = """
+        SELECT host_id, aclgroup_id
+        FROM afe_acl_groups_hosts
+        WHERE host_id IN (%s)
+        """
+        return self._get_many2many_dict(query, host_ids)
+
+
+    @_timer.decorate
+    def _get_label_hosts(self, host_ids):
+        if not host_ids:
+            return {}, {}
+        query = """
+        SELECT label_id, host_id
+        FROM afe_hosts_labels
+        WHERE host_id IN (%s)
+        """ % self._get_sql_id_list(host_ids)
+        rows = self._db.execute(query)
+        labels_to_hosts = self._process_many2many_dict(rows)
+        hosts_to_labels = self._process_many2many_dict(rows, flip=True)
+        return labels_to_hosts, hosts_to_labels
+
+
+    @classmethod
+    def find_unused_healty_hosts(cls):
+        """Get hosts that are currently unused and in the READY state.
+
+        @return: A list of host objects, one for each unused healthy host.
+        """
+        # Avoid any host with a currently active queue entry against it.
+        hqe_join = ('LEFT JOIN afe_host_queue_entries AS active_hqe '
+                    'ON (afe_hosts.id = active_hqe.host_id AND '
+                    'active_hqe.active)')
+
+        # Avoid any host with a new special task against it. There are 2 cases
+        # when an inactive but incomplete special task will not use the host
+        # this tick: 1. When the host is locked 2. When an active hqe already
+        # has special tasks for the same host. In both these cases this host
+        # will not be in the ready hosts list anyway. In all other cases,
+        # an incomplete special task will grab the host before a new job does
+        # by assigning an agent to it.
+        special_task_join = ('LEFT JOIN afe_special_tasks as new_tasks '
+                             'ON (afe_hosts.id = new_tasks.host_id AND '
+                             'new_tasks.is_complete=0)')
+
+        return scheduler_models.Host.fetch(
+            joins='%s %s' % (hqe_join, special_task_join),
+            where="active_hqe.host_id IS NULL AND new_tasks.host_id IS NULL "
+                  "AND afe_hosts.leased "
+                  "AND NOT afe_hosts.locked "
+                  "AND (afe_hosts.status IS NULL "
+                          "OR afe_hosts.status = 'Ready')")
+
+
+    @_timer.decorate
+    def set_leased(self, leased_value, **kwargs):
+        """Modify the leased bit on the hosts with ids in host_ids.
+
+        @param leased_value: The True/False value of the leased column for
+            the hosts with ids in host_ids.
+        @param kwargs: The args to use in finding matching hosts.
+        """
+        logging.info('Setting leased = %s for the hosts that match %s',
+                     leased_value, kwargs)
+        models.Host.objects.filter(**kwargs).update(leased=leased_value)
+
+
+    @_timer.decorate
+    def _get_labels(self, job_dependencies):
+        """
+        Calculate a dict mapping label id to label object so that we don't
+        frequently round trip to the database every time we need a label.
+
+        @param job_dependencies: A dict mapping an integer job id to a list of
+            integer label id's.  ie. {job_id: [label_id]}
+        @return: A dict mapping an integer label id to a scheduler model label
+            object.  ie. {label_id: label_object}
+
+        """
+        id_to_label = dict()
+        # Pull all the labels on hosts we might look at
+        host_labels = scheduler_models.Label.fetch(
+                where="id IN (SELECT label_id FROM afe_hosts_labels)")
+        id_to_label.update([(label.id, label) for label in host_labels])
+        # and pull all the labels on jobs we might look at.
+        job_label_set = set()
+        for job_deps in job_dependencies.values():
+            job_label_set.update(job_deps)
+        # On the rare/impossible chance that no jobs have any labels, we
+        # can skip this.
+        if job_label_set:
+            job_string_label_list = ','.join([str(x) for x in job_label_set])
+            job_labels = scheduler_models.Label.fetch(
+                    where="id IN (%s)" % job_string_label_list)
+            id_to_label.update([(label.id, label) for label in job_labels])
+        return id_to_label
+
+
+    @_timer.decorate
+    def refresh(self, pending_queue_entries):
+        """Update the query manager.
+
+        Cache information about a list of queue entries and eligible hosts
+        from the database so clients can avoid expensive round trips during
+        host acquisition.
+
+        @param pending_queue_entries: A list of queue entries about which we
+            need information.
+        """
+        self._hosts_available = self._get_ready_hosts()
+        relevant_jobs = [queue_entry.job_id
+                         for queue_entry in pending_queue_entries]
+        self._job_acls = self._get_job_acl_groups(relevant_jobs)
+        self._ineligible_hosts = (self._get_job_ineligible_hosts(relevant_jobs))
+        self._job_dependencies = (self._get_job_dependencies(relevant_jobs))
+        host_ids = self._hosts_available.keys()
+        self._host_acls = self._get_host_acls(host_ids)
+        self._label_hosts, self._host_labels = (
+                self._get_label_hosts(host_ids))
+        self._labels = self._get_labels(self._job_dependencies)
+
diff --git a/scheduler/rdb_cache_unittests.py b/scheduler/rdb_cache_unittests.py
index c320409..1e9f4d2 100644
--- a/scheduler/rdb_cache_unittests.py
+++ b/scheduler/rdb_cache_unittests.py
@@ -111,8 +111,7 @@
 
         self.god.stub_with(rdb.AvailableHostRequestHandler,
                            'get_response', local_get_response)
-        self.check_hosts(rdb_lib.acquire_hosts(
-            self.host_scheduler, queue_entries))
+        self.check_hosts(rdb_lib.acquire_hosts(queue_entries))
 
 
     def testCachingPriority(self):
@@ -162,8 +161,7 @@
 
         self.god.stub_with(rdb.AvailableHostRequestHandler,
                            'get_response', local_get_response)
-        self.check_hosts(rdb_lib.acquire_hosts(
-                self.host_scheduler, queue_entries))
+        self.check_hosts(rdb_lib.acquire_hosts(queue_entries))
 
 
     def testCachingEmptyList(self):
@@ -193,8 +191,7 @@
         queue_entries = self._dispatcher._refresh_pending_queue_entries()
         self.god.stub_with(rdb.AvailableHostRequestHandler,
                            'get_response', local_get_response)
-        self.check_hosts(rdb_lib.acquire_hosts(
-                self.host_scheduler, queue_entries))
+        self.check_hosts(rdb_lib.acquire_hosts(queue_entries))
 
 
     def testStaleCacheLine(self):
@@ -238,8 +235,7 @@
 
         self.god.stub_with(rdb.AvailableHostRequestHandler,
                            'get_response', local_get_response)
-        acquired_hosts = list(
-            rdb_lib.acquire_hosts(self.host_scheduler, queue_entries))
+        acquired_hosts = list(rdb_lib.acquire_hosts(queue_entries))
         self.assertTrue(acquired_hosts[0].id == host_1.id and
                         acquired_hosts[1].id == host_2.id and
                         acquired_hosts[2] is None)
@@ -341,6 +337,5 @@
 
         self.god.stub_with(rdb.AvailableHostRequestHandler,
                            'get_response', local_get_response)
-        self.check_hosts(rdb_lib.acquire_hosts(
-            self.host_scheduler, queue_entries))
+        self.check_hosts(rdb_lib.acquire_hosts(queue_entries))
 
diff --git a/scheduler/rdb_integration_tests.py b/scheduler/rdb_integration_tests.py
index b804c8c..5b0dfb2 100644
--- a/scheduler/rdb_integration_tests.py
+++ b/scheduler/rdb_integration_tests.py
@@ -132,8 +132,7 @@
         host.leased = 1
         host.save()
         queue_entries = self._dispatcher._refresh_pending_queue_entries()
-        hosts = list(rdb_lib.acquire_hosts(
-            self.host_scheduler, queue_entries))
+        hosts = list(rdb_lib.acquire_hosts(queue_entries))
         self.assertTrue(len(hosts) == 1 and hosts[0] is None)
 
 
@@ -153,7 +152,7 @@
                  self.db_helper.create_host('h2', deps=set(['a']))]
 
         @rdb_hosts.return_rdb_host
-        def local_find_hosts(host_query_maanger, deps, acls):
+        def local_find_hosts(host_query_manger, deps, acls):
             """Return a predetermined list of hosts, one of which is leased."""
             h1 = models.Host.objects.get(hostname='h1')
             h1.leased = 1
@@ -164,8 +163,7 @@
         self.god.stub_with(rdb.AvailableHostQueryManager, 'find_hosts',
                            local_find_hosts)
         queue_entries = self._dispatcher._refresh_pending_queue_entries()
-        hosts = list(rdb_lib.acquire_hosts(
-            self.host_scheduler, queue_entries))
+        hosts = list(rdb_lib.acquire_hosts(queue_entries))
         self.assertTrue(len(hosts) == 2 and None in hosts)
         self.check_hosts(iter(hosts))
 
@@ -226,8 +224,7 @@
         self.db_helper.create_host('h1', deps=deps, acls=acls)
         job = self.create_job(user='autotest_system', deps=deps, acls=acls)
         queue_entries = self._dispatcher._refresh_pending_queue_entries()
-        matching_host  = rdb_lib.acquire_hosts(
-                self.host_scheduler, queue_entries).next()
+        matching_host  = rdb_lib.acquire_hosts(queue_entries).next()
         self.check_host_assignment(job.id, matching_host.id)
         self.assertTrue(matching_host.leased == 1)
 
@@ -244,8 +241,7 @@
         self.db_helper.create_host('h1', deps=host_labels, acls=acls)
         job = self.create_job(user='autotest_system', deps=job_deps, acls=acls)
         queue_entries = self._dispatcher._refresh_pending_queue_entries()
-        matching_host  = rdb_lib.acquire_hosts(
-                self.host_scheduler, queue_entries).next()
+        matching_host  = rdb_lib.acquire_hosts(queue_entries).next()
         self.assert_(not matching_host)
 
 
@@ -265,8 +261,7 @@
         # 1 host that has the 'a' dep isn't.
         job = self.create_job(user='new_user', deps=deps, acls=job_acls)
         queue_entries = self._dispatcher._refresh_pending_queue_entries()
-        matching_host  = rdb_lib.acquire_hosts(
-                self.host_scheduler, queue_entries).next()
+        matching_host  = rdb_lib.acquire_hosts(queue_entries).next()
         self.assert_(not matching_host)
 
 
@@ -291,8 +286,7 @@
 
         self.god.stub_with(rdb_requests.BaseHostRequestManager, 'response',
                 AssignmentValidator.priority_checking_response_handler)
-        self.check_hosts(rdb_lib.acquire_hosts(
-            self.host_scheduler, queue_entries))
+        self.check_hosts(rdb_lib.acquire_hosts(queue_entries))
 
 
     def testPriorityLevels(self):
@@ -321,24 +315,21 @@
 
         self.god.stub_with(rdb_requests.BaseHostRequestManager, 'response',
                 AssignmentValidator.priority_checking_response_handler)
-        self.check_hosts(rdb_lib.acquire_hosts(
-            self.host_scheduler, queue_entries))
+        self.check_hosts(rdb_lib.acquire_hosts(queue_entries))
 
         # Elevate the priority of the unimportant job, so we now have
         # 2 jobs at the same priority.
         self.db_helper.increment_priority(job_id=unimportant_job.id)
         queue_entries = self._dispatcher._refresh_pending_queue_entries()
         self._release_unused_hosts()
-        self.check_hosts(rdb_lib.acquire_hosts(
-            self.host_scheduler, queue_entries))
+        self.check_hosts(rdb_lib.acquire_hosts(queue_entries))
 
         # Prioritize the first job, and confirm that it gets the host over the
         # jobs that got it the last time.
         self.db_helper.increment_priority(job_id=unimportant_job.id)
         queue_entries = self._dispatcher._refresh_pending_queue_entries()
         self._release_unused_hosts()
-        self.check_hosts(rdb_lib.acquire_hosts(
-            self.host_scheduler, queue_entries))
+        self.check_hosts(rdb_lib.acquire_hosts(queue_entries))
 
 
     def testFrontendJobScheduling(self):
@@ -362,7 +353,7 @@
         # Check that only the matching host is returned, and that we get 'None'
         # for the second request.
         queue_entries = self._dispatcher._refresh_pending_queue_entries()
-        hosts = list(rdb_lib.acquire_hosts(self.host_scheduler, queue_entries))
+        hosts = list(rdb_lib.acquire_hosts(queue_entries))
         self.assertTrue(len(hosts) == 2 and None in hosts)
         returned_host = [host for host in hosts if host].pop()
         self.assertTrue(matching_host.id == returned_host.id)
@@ -402,8 +393,7 @@
 
         self.god.stub_with(rdb_requests.BaseHostRequestManager, 'response',
                            local_response_handler)
-        self.check_hosts(rdb_lib.acquire_hosts(
-            self.host_scheduler, queue_entries))
+        self.check_hosts(rdb_lib.acquire_hosts(queue_entries))
 
 
     def testSuiteOrderedHostAcquisition(self):
@@ -481,7 +471,7 @@
 
         self.god.stub_with(rdb_requests.BaseHostRequestManager, 'response',
                            local_response_handler)
-        list(rdb_lib.acquire_hosts(self.host_scheduler, queue_entries))
+        list(rdb_lib.acquire_hosts(queue_entries))
 
 
     def testConfigurations(self):
@@ -497,6 +487,5 @@
         db_host = self.db_helper.create_host('h1', deps=host_deps)
         self.create_job(user='autotest_system', deps=job_labels)
         queue_entries = self._dispatcher._refresh_pending_queue_entries()
-        matching_host = rdb_lib.acquire_hosts(
-                self.host_scheduler, queue_entries).next()
+        matching_host = rdb_lib.acquire_hosts(queue_entries).next()
         self.assert_(matching_host.id == db_host.id)
diff --git a/scheduler/rdb_lib.py b/scheduler/rdb_lib.py
index 73fc2ef..1449eaa 100644
--- a/scheduler/rdb_lib.py
+++ b/scheduler/rdb_lib.py
@@ -15,15 +15,16 @@
 # format more ameanable to the rdb/rdb request managers.
 class JobQueryManager(object):
     """A caching query manager for all job related information."""
-    def __init__(self, host_scheduler, queue_entries):
+    def __init__(self, queue_entries):
 
-        # TODO(beeps): Break the dependency on the host_scheduler,
+        # TODO(beeps): Break this dependency on the host_query_manager,
         # crbug.com/336934.
-        self.host_scheduler = host_scheduler
+        from autotest_lib.scheduler import query_managers
+        self.query_manager = query_managers.AFEHostQueryManager()
         jobs = [queue_entry.job_id for queue_entry in queue_entries]
-        self._job_acls = self.host_scheduler._get_job_acl_groups(jobs)
-        self._job_deps = self.host_scheduler._get_job_dependencies(jobs)
-        self._labels = self.host_scheduler._get_labels(self._job_deps)
+        self._job_acls = self.query_manager._get_job_acl_groups(jobs)
+        self._job_deps = self.query_manager._get_job_dependencies(jobs)
+        self._labels = self.query_manager._get_labels(self._job_deps)
 
 
     def get_job_info(self, queue_entry):
@@ -45,21 +46,19 @@
                 'priority': queue_entry.job.priority}
 
 
-def acquire_hosts(host_scheduler, queue_entries):
+def acquire_hosts(queue_entries):
     """Acquire hosts for the list of queue_entries.
 
     The act of acquisition involves leasing a host from the rdb.
 
     @param queue_entries: A list of queue_entries that need hosts.
-    @param host_scheduler: The host_scheduler object, needed to get job
-        information.
 
     @yield: An rdb_hosts.RDBClientHostWrapper for each host acquired on behalf
         of a queue_entry, or None if a host wasn't found.
 
     @raises RDBException: If something goes wrong making the request.
     """
-    job_query_manager = JobQueryManager(host_scheduler, queue_entries)
+    job_query_manager = JobQueryManager(queue_entries)
     request_manager = rdb_requests.BaseHostRequestManager(
             rdb_requests.AcquireHostRequest, rdb.rdb_host_request_dispatcher)
     for entry in queue_entries:
diff --git a/scheduler/rdb_testing_utils.py b/scheduler/rdb_testing_utils.py
index dab3a27..917d33f 100644
--- a/scheduler/rdb_testing_utils.py
+++ b/scheduler/rdb_testing_utils.py
@@ -16,6 +16,7 @@
 from autotest_lib.frontend.afe import rdb_model_extensions as rdb_models
 from autotest_lib.scheduler import monitor_db
 from autotest_lib.scheduler import monitor_db_functional_test
+from autotest_lib.scheduler import query_managers
 from autotest_lib.scheduler import scheduler_lib
 from autotest_lib.scheduler import scheduler_models
 from autotest_lib.scheduler import rdb_hosts
@@ -29,6 +30,7 @@
 DEFAULT_DEPS = ['a', 'b']
 DEFAULT_USER = 'system'
 
+
 def get_default_job_params():
     return {'deps': DEFAULT_DEPS, 'user': DEFAULT_USER, 'acls': DEFAULT_ACLS,
             'priority': 0, 'parent_job_id': 0}
@@ -87,6 +89,11 @@
 
 
     @classmethod
+    def get_hqes(cls, **kwargs):
+        return models.HostQueueEntry.objects.filter(**kwargs)
+
+
+    @classmethod
     def create_label(cls, name, **kwargs):
         label = cls.get_labels(name=name, **kwargs)
         return (models.Label.add_object(name=name, **kwargs)
@@ -100,6 +107,24 @@
 
 
     @classmethod
+    def create_special_task(cls, job_id=None, host_id=None,
+                            task=models.SpecialTask.Task.VERIFY,
+                            user='autotest-system'):
+        if job_id:
+            queue_entry = cls.get_hqes(job_id=job_id)[0]
+            host_id = queue_entry.host.id
+        else:
+            queue_entry = None
+        host = models.Host.objects.get(id=host_id)
+        owner = cls.create_user(user)
+        if not host:
+            raise ValueError('Require a host to create special tasks.')
+        return models.SpecialTask.objects.create(
+                host=host, queue_entry=queue_entry, task=task,
+                requested_by_id=owner.id)
+
+
+    @classmethod
     def add_labels_to_host(cls, host, label_names=set([])):
         label_objects = set([])
         for label in label_names:
@@ -189,23 +214,6 @@
 
 
     @classmethod
-    def add_host_to_job(cls, host, job_id):
-        """Add a host to the hqe of a job.
-
-        @param host: An instance of the host model.
-        @param job_id: The job to which we need to add the host.
-
-        @raises ValueError: If the hqe for the job already has a host,
-            or if the host argument isn't a Host instance.
-        """
-        hqe = models.HostQueueEntry.objects.get(job_id=job_id)
-        if hqe.host:
-            raise ValueError('HQE for job %s already has a host' % job_id)
-        hqe.host = host
-        hqe.save()
-
-
-    @classmethod
     def increment_priority(cls, job_id):
         job = models.Job.objects.get(id=job_id)
         job.priority = job.priority + 1
@@ -232,7 +240,12 @@
         self.host_scheduler.tick()
 
 
-    def setUp(self):
+    def setUp(self, inline_host_acquisition=True):
+        """Common setup module for tests that need a jobs/host database.
+
+        @param inline_host_acquisition: If True, the dispatcher tries to acquire
+            hosts inline with the rest of the tick.
+        """
         self.db_helper = DBHelper()
         self._database = self.db_helper.database
         # Runs syncdb setting up initial database conditions
@@ -241,8 +254,12 @@
         self.god.stub_with(connection_manager, 'db_connection', self._database)
         self.god.stub_with(monitor_db, '_db_manager', connection_manager)
         self.god.stub_with(scheduler_models, '_db', self._database)
+        self.god.stub_with(monitor_db, '_inline_host_acquisition',
+                           inline_host_acquisition)
         self._dispatcher = monitor_db.Dispatcher()
         self.host_scheduler = self._dispatcher._host_scheduler
+        self.host_query_manager = query_managers.AFEHostQueryManager()
+        self.job_query_manager = self._dispatcher._job_query_manager
         self._release_unused_hosts()