autotest: Delete rdb and related files

Again, all of these files import each other in an obscene way (they
work around import loops by putting imports in functions), so the only
way to make sure there are no references is delete all of them
together.

Checked with grep that there are no more hits:

grep scheduler_models
grep query_managers
grep import.*rdb
grep rdb.*import

Bug: 1033823
Change-Id: Ic3659dc784e147ef48110b9dcc8da5073b28ac69
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/third_party/autotest/+/1966844
Reviewed-by: Xixuan Wu <xixuan@chromium.org>
Tested-by: Allen Li <ayatane@chromium.org>
Commit-Queue: Allen Li <ayatane@chromium.org>
diff --git a/scheduler/query_managers.py b/scheduler/query_managers.py
deleted file mode 100644
index 2d2dcd8..0000000
--- a/scheduler/query_managers.py
+++ /dev/null
@@ -1,393 +0,0 @@
-#pylint: disable-msg=C0111
-
-# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""Scheduler library classes.
-"""
-
-import collections
-import logging
-
-import common
-
-from autotest_lib.frontend import setup_django_environment
-
-from autotest_lib.client.common_lib import utils
-from autotest_lib.frontend.afe import models
-from autotest_lib.server.cros.dynamic_suite import constants
-from autotest_lib.scheduler import scheduler_models
-from autotest_lib.scheduler import scheduler_lib
-
-try:
-    from chromite.lib import metrics
-except ImportError:
-    metrics = utils.metrics_mock
-
-
-_job_timer_name = 'chromeos/autotest/scheduler/job_query_durations/%s'
-class AFEJobQueryManager(object):
-    """Query manager for AFE Jobs."""
-
-    # A subquery to only get inactive hostless jobs.
-    hostless_query = 'host_id IS NULL AND meta_host IS NULL'
-
-
-    @metrics.SecondsTimerDecorator(
-            _job_timer_name % 'get_pending_queue_entries')
-    def get_pending_queue_entries(self, only_hostless=False):
-        """
-        Fetch a list of new host queue entries.
-
-        The ordering of this list is important, as every new agent
-        we schedule can potentially contribute to the process count
-        on the drone, which has a static limit. The sort order
-        prioritizes jobs as follows:
-        1. High priority jobs: Based on the afe_job's priority
-        2. With hosts and metahosts: This will only happen if we don't
-            activate the hqe after assigning a host to it in
-            schedule_new_jobs.
-        3. With hosts but without metahosts: When tests are scheduled
-            through the frontend the owner of the job would have chosen
-            a host for it.
-        4. Without hosts but with metahosts: This is the common case of
-            a new test that needs a DUT. We assign a host and set it to
-            active so it shouldn't show up in case 2 on the next tick.
-        5. Without hosts and without metahosts: Hostless suite jobs, that
-            will result in new jobs that fall under category 4.
-
-        A note about the ordering of cases 3 and 4:
-        Prioritizing one case above the other leads to earlier acquisition
-        of the following resources: 1. process slots on the drone 2. machines.
-        - When a user schedules a job through the afe they choose a specific
-          host for it. Jobs with metahost can utilize any host that satisfies
-          the metahost criterion. This means that if we had scheduled 4 before
-          3 there is a good chance that a job which could've used another host,
-          will now use the host assigned to a metahost-less job. Given the
-          availability of machines in pool:suites, this almost guarantees
-          starvation for jobs scheduled through the frontend.
-        - Scheduling 4 before 3 also has its pros however, since a suite
-          has the concept of a time out, whereas users can wait. If we hit the
-          process count on the drone a suite can timeout waiting on the test,
-          but a user job generally has a much longer timeout, and relatively
-          harmless consequences.
-        The current ordering was chosed because it is more likely that we will
-        run out of machines in pool:suites than processes on the drone.
-
-        @returns A list of HQEs ordered according to sort_order.
-        """
-        sort_order = ('afe_jobs.priority DESC, '
-                      'ISNULL(host_id), '
-                      'ISNULL(meta_host), '
-                      'parent_job_id, '
-                      'job_id')
-        # Don't execute jobs that should be executed by a shard in the global
-        # scheduler.
-        # This won't prevent the shard scheduler to run this, as the shard db
-        # doesn't have an an entry in afe_shards_labels.
-        query=('NOT complete AND NOT active AND status="Queued"'
-               'AND NOT aborted AND afe_shards_labels.id IS NULL')
-
-        # TODO(jakobjuelich, beeps): Optimize this query. Details:
-        # Compressed output of EXPLAIN <query>:
-        # +------------------------+--------+-------------------------+-------+
-        # | table                  | type   | key                     | rows  |
-        # +------------------------+--------+-------------------------+-------+
-        # | afe_host_queue_entries | ref    | host_queue_entry_status | 30536 |
-        # | afe_shards_labels      | ref    | shard_label_id_fk       |     1 |
-        # | afe_jobs               | eq_ref | PRIMARY                 |     1 |
-        # +------------------------+--------+-------------------------+-------+
-        # This shows the first part of the query fetches a lot of objects, that
-        # are then filtered. The joins are comparably fast: There's usually just
-        # one or none shard mapping that can be answered fully using an index
-        # (shard_label_id_fk), similar thing applies to the job.
-        #
-        # This works for now, but once O(#Jobs in shard) << O(#Jobs in Queued),
-        # it might be more efficient to filter on the meta_host first, instead
-        # of the status.
-        if only_hostless:
-            query = '%s AND (%s)' % (query, self.hostless_query)
-        return list(scheduler_models.HostQueueEntry.fetch(
-            joins=('INNER JOIN afe_jobs ON (job_id=afe_jobs.id) '
-                   'LEFT JOIN afe_shards_labels ON ('
-                   'meta_host=afe_shards_labels.label_id)'),
-            where=query, order_by=sort_order))
-
-
-    @metrics.SecondsTimerDecorator(
-            _job_timer_name % 'get_prioritized_special_tasks')
-    def get_prioritized_special_tasks(self, only_tasks_with_leased_hosts=False):
-        """
-        Returns all queued SpecialTasks prioritized for repair first, then
-        cleanup, then verify.
-
-        @param only_tasks_with_leased_hosts: If true, this method only returns
-            tasks with leased hosts.
-
-        @return: list of afe.models.SpecialTasks sorted according to priority.
-        """
-        queued_tasks = models.SpecialTask.objects.filter(is_active=False,
-                                                         is_complete=False,
-                                                         host__locked=False)
-        # exclude hosts with active queue entries unless the SpecialTask is for
-        # that queue entry
-        queued_tasks = models.SpecialTask.objects.add_join(
-                queued_tasks, 'afe_host_queue_entries', 'host_id',
-                join_condition='afe_host_queue_entries.active',
-                join_from_key='host_id', force_left_join=True)
-        queued_tasks = queued_tasks.extra(
-                where=['(afe_host_queue_entries.id IS NULL OR '
-                       'afe_host_queue_entries.id = '
-                               'afe_special_tasks.queue_entry_id)'])
-        if only_tasks_with_leased_hosts:
-            queued_tasks = queued_tasks.filter(host__leased=True)
-
-        # reorder tasks by priority
-        task_priority_order = [models.SpecialTask.Task.REPAIR,
-                               models.SpecialTask.Task.CLEANUP,
-                               models.SpecialTask.Task.VERIFY,
-                               models.SpecialTask.Task.RESET,
-                               models.SpecialTask.Task.PROVISION]
-        def task_priority_key(task):
-            return task_priority_order.index(task.task)
-        return sorted(queued_tasks, key=task_priority_key)
-
-
-    @classmethod
-    def get_overlapping_jobs(cls):
-        """A helper method to get all active jobs using the same host.
-
-        @return: A list of dictionaries with the hqe id, job_id and host_id
-            of the currently overlapping jobs.
-        """
-        # Filter all active hqes and stand alone special tasks to make sure
-        # a host isn't being used by two jobs at the same time. An incomplete
-        # stand alone special task can share a host with an active hqe, an
-        # example of this is the cleanup scheduled in gathering.
-        hqe_hosts = list(models.HostQueueEntry.objects.filter(
-                active=1, complete=0, host_id__isnull=False).values_list(
-                'host_id', flat=True))
-        special_task_hosts = list(models.SpecialTask.objects.filter(
-                is_active=1, is_complete=0, host_id__isnull=False,
-                queue_entry_id__isnull=True).values_list('host_id', flat=True))
-        host_counts = collections.Counter(
-                hqe_hosts + special_task_hosts).most_common()
-        multiple_hosts = [count[0] for count in host_counts if count[1] > 1]
-        return list(models.HostQueueEntry.objects.filter(
-                host_id__in=multiple_hosts, active=True).values(
-                        'id', 'job_id', 'host_id'))
-
-
-    @metrics.SecondsTimerDecorator(
-            _job_timer_name % 'get_suite_host_assignment')
-    def get_suite_host_assignment(self):
-        """A helper method to get how many hosts each suite is holding.
-
-        @return: Two dictionaries (suite_host_num, hosts_to_suites)
-                 suite_host_num maps suite job id to number of hosts
-                 holding by its child jobs.
-                 hosts_to_suites contains current hosts held by
-                 any suites, and maps the host id to its parent_job_id.
-        """
-        query = models.HostQueueEntry.objects.filter(
-                host_id__isnull=False, complete=0, active=1,
-                job__parent_job_id__isnull=False)
-        suite_host_num = {}
-        hosts_to_suites = {}
-        for hqe in query:
-            host_id = hqe.host_id
-            parent_job_id = hqe.job.parent_job_id
-            count = suite_host_num.get(parent_job_id, 0)
-            suite_host_num[parent_job_id] = count + 1
-            hosts_to_suites[host_id] = parent_job_id
-        return suite_host_num, hosts_to_suites
-
-
-    @metrics.SecondsTimerDecorator( _job_timer_name % 'get_min_duts_of_suites')
-    def get_min_duts_of_suites(self, suite_job_ids):
-        """Load suite_min_duts job keyval for a set of suites.
-
-        @param suite_job_ids: A set of suite job ids.
-
-        @return: A dictionary where the key is a suite job id,
-                 the value is the value of 'suite_min_duts'.
-        """
-        query = models.JobKeyval.objects.filter(
-                job_id__in=suite_job_ids,
-                key=constants.SUITE_MIN_DUTS_KEY, value__isnull=False)
-        return dict((keyval.job_id, int(keyval.value)) for keyval in query)
-
-
-_host_timer_name = 'chromeos/autotest/scheduler/host_query_durations/%s'
-class AFEHostQueryManager(object):
-    """Query manager for AFE Hosts."""
-
-    def __init__(self):
-        """Create an AFEHostQueryManager.
-
-        @param db: A connection to the database with the afe_hosts table.
-        """
-        self._db = scheduler_lib.ConnectionManager().get_connection()
-
-
-    def _process_many2many_dict(self, rows, flip=False):
-        result = {}
-        for row in rows:
-            left_id, right_id = int(row[0]), int(row[1])
-            if flip:
-                left_id, right_id = right_id, left_id
-            result.setdefault(left_id, set()).add(right_id)
-        return result
-
-
-    def _get_sql_id_list(self, id_list):
-        return ','.join(str(item_id) for item_id in id_list)
-
-
-    def _get_many2many_dict(self, query, id_list, flip=False):
-        if not id_list:
-            return {}
-        query %= self._get_sql_id_list(id_list)
-        rows = self._db.execute(query)
-        return self._process_many2many_dict(rows, flip)
-
-
-    def _get_ready_hosts(self):
-        # We don't lose anything by re-doing these checks
-        # even though we release hosts on the same conditions.
-        # In the future we might have multiple clients that
-        # release_hosts and/or lock them independent of the
-        # scheduler tick.
-        hosts = scheduler_models.Host.fetch(
-            where="NOT afe_hosts.leased "
-                  "AND NOT afe_hosts.locked "
-                  "AND (afe_hosts.status IS NULL "
-                      "OR afe_hosts.status = 'Ready')")
-        return dict((host.id, host) for host in hosts)
-
-
-    @metrics.SecondsTimerDecorator(_host_timer_name % 'get_job_acl_groups')
-    def _get_job_acl_groups(self, job_ids):
-        query = """
-        SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id
-        FROM afe_jobs
-        INNER JOIN afe_users ON afe_users.login = afe_jobs.owner
-        INNER JOIN afe_acl_groups_users ON
-                afe_acl_groups_users.user_id = afe_users.id
-        WHERE afe_jobs.id IN (%s)
-        """
-        return self._get_many2many_dict(query, job_ids)
-
-
-    def _get_job_ineligible_hosts(self, job_ids):
-        query = """
-        SELECT job_id, host_id
-        FROM afe_ineligible_host_queues
-        WHERE job_id IN (%s)
-        """
-        return self._get_many2many_dict(query, job_ids)
-
-
-    @metrics.SecondsTimerDecorator(_host_timer_name % 'get_job_dependencies')
-    def _get_job_dependencies(self, job_ids):
-        query = """
-        SELECT job_id, label_id
-        FROM afe_jobs_dependency_labels
-        WHERE job_id IN (%s)
-        """
-        return self._get_many2many_dict(query, job_ids)
-
-
-    @classmethod
-    def find_unused_healty_hosts(cls):
-        """Get hosts that are currently unused and in the READY state.
-
-        @return: A list of host objects, one for each unused healthy host.
-        """
-        # Avoid any host with a currently active queue entry against it.
-        hqe_join = ('LEFT JOIN afe_host_queue_entries AS active_hqe '
-                    'ON (afe_hosts.id = active_hqe.host_id AND '
-                    'active_hqe.active)')
-
-        # Avoid any host with a new special task against it. There are 2 cases
-        # when an inactive but incomplete special task will not use the host
-        # this tick: 1. When the host is locked 2. When an active hqe already
-        # has special tasks for the same host. In both these cases this host
-        # will not be in the ready hosts list anyway. In all other cases,
-        # an incomplete special task will grab the host before a new job does
-        # by assigning an agent to it.
-        special_task_join = ('LEFT JOIN afe_special_tasks as new_tasks '
-                             'ON (afe_hosts.id = new_tasks.host_id AND '
-                             'new_tasks.is_complete=0)')
-
-        return scheduler_models.Host.fetch(
-            joins='%s %s' % (hqe_join, special_task_join),
-            where="active_hqe.host_id IS NULL AND new_tasks.host_id IS NULL "
-                  "AND afe_hosts.leased "
-                  "AND NOT afe_hosts.locked "
-                  "AND (afe_hosts.status IS NULL "
-                          "OR afe_hosts.status = 'Ready')")
-
-    @metrics.SecondsTimerDecorator(_host_timer_name % 'set_leased')
-    def set_leased(self, leased_value, **kwargs):
-        """Modify the leased bit on the hosts with ids in host_ids.
-
-        @param leased_value: The True/False value of the leased column for
-            the hosts with ids in host_ids.
-        @param kwargs: The args to use in finding matching hosts.
-        """
-        logging.info('Setting leased = %s for the hosts that match %s',
-                     leased_value, kwargs)
-        models.Host.objects.filter(**kwargs).update(leased=leased_value)
-
-
-    @metrics.SecondsTimerDecorator(_host_timer_name % 'get_labels')
-    def _get_labels(self, job_dependencies):
-        """
-        Calculate a dict mapping label id to label object so that we don't
-        frequently round trip to the database every time we need a label.
-
-        @param job_dependencies: A dict mapping an integer job id to a list of
-            integer label id's.  ie. {job_id: [label_id]}
-        @return: A dict mapping an integer label id to a scheduler model label
-            object.  ie. {label_id: label_object}
-
-        """
-        id_to_label = dict()
-        # Pull all the labels on hosts we might look at
-        host_labels = scheduler_models.Label.fetch(
-                where="id IN (SELECT label_id FROM afe_hosts_labels)")
-        id_to_label.update([(label.id, label) for label in host_labels])
-        # and pull all the labels on jobs we might look at.
-        job_label_set = set()
-        for job_deps in job_dependencies.values():
-            job_label_set.update(job_deps)
-        # On the rare/impossible chance that no jobs have any labels, we
-        # can skip this.
-        if job_label_set:
-            job_string_label_list = ','.join([str(x) for x in job_label_set])
-            job_labels = scheduler_models.Label.fetch(
-                    where="id IN (%s)" % job_string_label_list)
-            id_to_label.update([(label.id, label) for label in job_labels])
-        return id_to_label
-
-
-    def refresh(self, pending_queue_entries):
-        """Update the query manager.
-
-        Cache information about a list of queue entries and eligible hosts
-        from the database so clients can avoid expensive round trips during
-        host acquisition.
-
-        @param pending_queue_entries: A list of queue entries about which we
-            need information.
-        """
-        self._hosts_available = self._get_ready_hosts()
-        relevant_jobs = [queue_entry.job_id
-                         for queue_entry in pending_queue_entries]
-        self._job_acls = self._get_job_acl_groups(relevant_jobs)
-        self._ineligible_hosts = (self._get_job_ineligible_hosts(relevant_jobs))
-        self._job_dependencies = (self._get_job_dependencies(relevant_jobs))
-        host_ids = self._hosts_available.keys()
-        self._labels = self._get_labels(self._job_dependencies)
diff --git a/scheduler/rdb.py b/scheduler/rdb.py
deleted file mode 100644
index d598659..0000000
--- a/scheduler/rdb.py
+++ /dev/null
@@ -1,529 +0,0 @@
-# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""Rdb server module.
-"""
-
-import logging
-
-import common
-
-from django.core import exceptions as django_exceptions
-from django.db.models import fields
-from django.db.models import Q
-from autotest_lib.frontend.afe import models
-from autotest_lib.scheduler import rdb_cache_manager
-from autotest_lib.scheduler import rdb_hosts
-from autotest_lib.scheduler import rdb_requests
-from autotest_lib.scheduler import rdb_utils
-from autotest_lib.server import utils
-
-try:
-    from chromite.lib import metrics
-except ImportError:
-    metrics = utils.metrics_mock
-
-
-_rdb_timer_name = 'chromeos/autotest/scheduler/rdb/durations/%s'
-_is_master = not utils.is_shard()
-
-# Qeury managers: Provide a layer of abstraction over the database by
-# encapsulating common query patterns used by the rdb.
-class BaseHostQueryManager(object):
-    """Base manager for host queries on all hosts.
-    """
-
-    host_objects = models.Host.objects
-
-
-    def update_hosts(self, host_ids, **kwargs):
-        """Update fields on a hosts.
-
-        @param host_ids: A list of ids of hosts to update.
-        @param kwargs: A key value dictionary corresponding to column, value
-            in the host database.
-        """
-        self.host_objects.filter(id__in=host_ids).update(**kwargs)
-
-
-    @rdb_hosts.return_rdb_host
-    def get_hosts(self, ids):
-        """Get host objects for the given ids.
-
-        @param ids: The ids for which we need host objects.
-
-        @returns: A list of RDBServerHostWrapper objects, ordered by host_id.
-        """
-        return self.host_objects.filter(id__in=ids).order_by('id')
-
-
-    @rdb_hosts.return_rdb_host
-    def find_hosts(self, deps, acls):
-        """Finds valid hosts matching deps, acls.
-
-        @param deps: A list/frozenset of dependencies (label id) to match.
-        @param acls: A list/frozenset of acls, at least one of which must
-            coincide with an acl group the chosen host is in.
-
-        @return: A set of matching hosts available.
-        """
-        hosts_available = self.host_objects.filter(invalid=0)
-        hosts_available = hosts_available.filter(Q(aclgroup__id__in=acls))
-        hosts_available = models.Host.get_hosts_with_label_ids(
-                list(deps), hosts_available)
-        return set(hosts_available)
-
-
-class AvailableHostQueryManager(BaseHostQueryManager):
-    """Query manager for requests on un-leased, un-locked hosts.
-    """
-
-    host_objects = models.Host.leased_objects
-
-
-# Request Handlers: Used in conjunction with requests in rdb_utils, these
-# handlers acquire hosts for a request and record the acquisition in
-# an response_map dictionary keyed on the request itself, with the host/hosts
-# as values.
-class BaseHostRequestHandler(object):
-    """Handler for requests related to hosts, leased or unleased.
-
-    This class is only capable of blindly returning host information.
-    """
-
-    def __init__(self):
-        self.host_query_manager = BaseHostQueryManager()
-        self.response_map = {}
-
-
-    def update_response_map(self, request, response, append=False):
-        """Record a response for a request.
-
-        The response_map only contains requests that were either satisfied, or
-        that ran into an exception. Often this translates to reserving hosts
-        against a request. If the rdb hit an exception processing a request, the
-        exception gets recorded in the map for the client to reraise.
-
-        @param response: A response for the request.
-        @param request: The request that has reserved these hosts.
-        @param append: Boolean, whether to append new hosts in
-                       |response| for existing request.
-                       Will not append if existing response is
-                       a list of exceptions.
-
-        @raises RDBException: If an empty values is added to the map.
-        """
-        if not response:
-            raise rdb_utils.RDBException('response_map dict can only contain '
-                    'valid responses. Request %s, response %s is invalid.' %
-                     (request, response))
-        exist_response = self.response_map.setdefault(request, [])
-        if exist_response and not append:
-            raise rdb_utils.RDBException('Request %s already has response %s '
-                                         'the rdb cannot return multiple '
-                                         'responses for the same request.' %
-                                         (request, response))
-        if exist_response and append and not isinstance(
-                exist_response[0], rdb_hosts.RDBHost):
-            # Do not append if existing response contains exception.
-            return
-        exist_response.extend(response)
-
-
-    def _check_response_map(self):
-        """Verify that we never give the same host to different requests.
-
-        @raises RDBException: If the same host is assigned to multiple requests.
-        """
-        unique_hosts = set([])
-        for request, response in self.response_map.iteritems():
-            # Each value in the response map can only either be a list of
-            # RDBHosts or a list of RDBExceptions, not a mix of both.
-            if isinstance(response[0], rdb_hosts.RDBHost):
-                if any([host in unique_hosts for host in response]):
-                    raise rdb_utils.RDBException(
-                            'Assigning the same host to multiple requests. New '
-                            'hosts %s, request %s, response_map: %s' %
-                            (response, request, self.response_map))
-                else:
-                    unique_hosts = unique_hosts.union(response)
-
-
-    def _record_exceptions(self, request, exceptions):
-        """Record a list of exceptions for a request.
-
-        @param request: The request for which the exceptions were hit.
-        @param exceptions: The exceptions hit while processing the request.
-        """
-        rdb_exceptions = [rdb_utils.RDBException(ex) for ex in exceptions]
-        self.update_response_map(request, rdb_exceptions)
-
-
-    def get_response(self):
-        """Convert all RDBServerHostWrapper objects to host info dictionaries.
-
-        @return: A dictionary mapping requests to a list of matching host_infos.
-
-        @raises RDBException: If the same host is assigned to multiple requests.
-        """
-        self._check_response_map()
-        for request, response in self.response_map.iteritems():
-            self.response_map[request] = [reply.wire_format()
-                                          for reply in response]
-        return self.response_map
-
-
-    def update_hosts(self, update_requests):
-        """Updates host tables with a payload.
-
-        @param update_requests: A list of update requests, as defined in
-            rdb_requests.UpdateHostRequest.
-        """
-        # Last payload for a host_id wins in the case of conflicting requests.
-        unique_host_requests = {}
-        for request in update_requests:
-            if unique_host_requests.get(request.host_id):
-                unique_host_requests[request.host_id].update(request.payload)
-            else:
-                unique_host_requests[request.host_id] = request.payload
-
-        # Batch similar payloads so we can do them in one table scan.
-        similar_requests = {}
-        for host_id, payload in unique_host_requests.iteritems():
-            similar_requests.setdefault(payload, []).append(host_id)
-
-        # If fields of the update don't match columns in the database,
-        # record the exception in the response map. This also means later
-        # updates will get applied even if previous updates fail.
-        for payload, hosts in similar_requests.iteritems():
-            try:
-                response = self.host_query_manager.update_hosts(hosts, **payload)
-            except (django_exceptions.FieldError,
-                    fields.FieldDoesNotExist, ValueError) as e:
-                for host in hosts:
-                    # Since update requests have a consistent hash this will map
-                    # to the same key as the original request.
-                    request = rdb_requests.UpdateHostRequest(
-                            host_id=host, payload=payload).get_request()
-                    self._record_exceptions(request, [e])
-
-
-    def batch_get_hosts(self, host_requests):
-        """Get hosts matching the requests.
-
-        This method does not acquire the hosts, i.e it reserves hosts against
-        requests leaving their leased state untouched.
-
-        @param host_requests: A list of requests, as defined in
-            rdb_utils.BaseHostRequest.
-        """
-        host_ids = set([request.host_id for request in host_requests])
-        host_map = {}
-
-        # This list will not contain available hosts if executed using
-        # an AvailableHostQueryManager.
-        for host in self.host_query_manager.get_hosts(host_ids):
-            host_map[host.id] = host
-        for request in host_requests:
-            if request.host_id in host_map:
-                self.update_response_map(request, [host_map[request.host_id]])
-            else:
-                logging.warning('rdb could not get host for request: %s, it '
-                                'is already leased or locked', request)
-
-
-class AvailableHostRequestHandler(BaseHostRequestHandler):
-    """Handler for requests related to available (unleased and unlocked) hosts.
-
-    This class is capable of acquiring or validating hosts for requests.
-    """
-
-
-    def __init__(self):
-        self.host_query_manager = AvailableHostQueryManager()
-        self.cache = rdb_cache_manager.RDBHostCacheManager()
-        self.response_map = {}
-        self.unsatisfied_requests = 0
-        self.leased_hosts_count = 0
-        self.request_accountant = None
-
-
-    @metrics.SecondsTimerDecorator(_rdb_timer_name % 'lease_hosts')
-    def lease_hosts(self, hosts):
-        """Leases a list of hosts.
-
-        @param hosts: A list of RDBServerHostWrapper instances to lease.
-
-        @return: The list of RDBServerHostWrappers that were successfully
-            leased.
-        """
-        #TODO(beeps): crbug.com/353183.
-        unleased_hosts = set(hosts)
-        leased_hosts = set([])
-        for host in unleased_hosts:
-            try:
-                host.lease()
-            except rdb_utils.RDBException as e:
-                logging.error('Unable to lease host %s: %s', host.hostname, e)
-            else:
-                leased_hosts.add(host)
-        return list(leased_hosts)
-
-
-    @classmethod
-    def valid_host_assignment(cls, request, host):
-        """Check if a host, request pairing is valid.
-
-        @param request: The request to match against the host.
-        @param host: An RDBServerHostWrapper instance.
-
-        @return: True if the host, request assignment is valid.
-
-        @raises RDBException: If the request already has another host_ids
-            associated with it.
-        """
-        if request.host_id and request.host_id != host.id:
-            raise rdb_utils.RDBException(
-                    'Cannot assign a different host for request: %s, it '
-                    'already has one: %s ' % (request, host.id))
-
-        # Getting all labels and acls might result in large queries, so
-        # bail early if the host is already leased.
-        if host.leased:
-            return False
-        # If a host is invalid it must be a one time host added to the
-        # afe specifically for this purpose, so it doesn't require acl checking.
-        acl_match = (request.acls.intersection(host.acls) or host.invalid)
-        label_match = (request.deps.intersection(host.labels) == request.deps)
-        return acl_match and label_match
-
-
-    @classmethod
-    def _sort_hosts_by_preferred_deps(cls, hosts, preferred_deps):
-        """Sort hosts in the order of how many preferred deps it has.
-
-        This allows rdb always choose the hosts with the most preferred deps
-        for a request. One important use case is including cros-version as
-        a preferred dependence. By choosing a host with the same cros-version,
-        we can save the time on provisioning it. Note this is not guaranteed
-        if preferred_deps contains other labels as well.
-
-        @param hosts: A list of hosts to sort.
-        @param preferred_deps: A list of deps that are preferred.
-
-        @return: A list of sorted hosts.
-
-        """
-        hosts = sorted(
-                hosts,
-                key=lambda host: len(set(preferred_deps) & set(host.labels)),
-                reverse=True)
-        return hosts
-
-
-    @rdb_cache_manager.memoize_hosts
-    def _acquire_hosts(self, request, hosts_required, is_acquire_min_duts=False,
-                       **kwargs):
-        """Acquire hosts for a group of similar requests.
-
-        Find and acquire hosts that can satisfy a group of requests.
-        1. If the caching decorator doesn't pass in a list of matching hosts
-           via the MEMOIZE_KEY this method will directly check the database for
-           matching hosts.
-        2. If all matching hosts are not leased for this request, the remaining
-           hosts are returned to the caching decorator, to place in the cache.
-
-        @param hosts_required: Number of hosts required to satisfy request.
-        @param request: The request for hosts.
-        @param is_acquire_min_duts: Boolean. Indicate whether this is to
-                                    acquire minimum required duts, only used
-                                    for stats purpose.
-
-        @return: The list of excess matching hosts.
-        """
-        hosts = kwargs.get(rdb_cache_manager.MEMOIZE_KEY, [])
-        if not hosts:
-            hosts = self.host_query_manager.find_hosts(
-                            request.deps, request.acls)
-
-        # <-----[:attempt_lease_hosts](evicted)--------> <-(returned, cached)->
-        # |   -leased_hosts-  |   -stale cached hosts-  | -unleased matching- |
-        # --used this request---used by earlier request----------unused--------
-        hosts = self._sort_hosts_by_preferred_deps(
-                hosts, request.preferred_deps)
-        attempt_lease_hosts = min(len(hosts), hosts_required)
-        leased_host_count = 0
-        if attempt_lease_hosts:
-            leased_hosts = self.lease_hosts(hosts[:attempt_lease_hosts])
-            if leased_hosts:
-                self.update_response_map(request, leased_hosts, append=True)
-
-            # [:attempt_leased_hosts] - leased_hosts will include hosts that
-            # failed leasing, most likely because they're already leased, so
-            # don't cache them again.
-            leased_host_count = len(leased_hosts)
-            failed_leasing = attempt_lease_hosts - leased_host_count
-            if failed_leasing > 0:
-                # For the sake of simplicity this calculation assumes that
-                # leasing only fails if there's a stale cached host already
-                # leased by a previous request, ergo, we can only get here
-                # through a cache hit.
-                line_length = len(hosts)
-                self.cache.stale_entries.append(
-                        (float(failed_leasing)/line_length) * 100)
-            self.leased_hosts_count += leased_host_count
-        if is_acquire_min_duts:
-            self.request_accountant.record_acquire_min_duts(
-                    request, hosts_required, leased_host_count)
-        self.unsatisfied_requests += max(hosts_required - leased_host_count, 0)
-        # Cache the unleased matching hosts against the request.
-        return hosts[attempt_lease_hosts:]
-
-
-    @metrics.SecondsTimerDecorator(_rdb_timer_name % 'batch_acquire_hosts')
-    def batch_acquire_hosts(self, host_requests):
-        """Acquire hosts for a list of requests.
-
-        The act of acquisition involves finding and leasing a set of hosts
-        that match the parameters of a request. Each acquired host is added
-        to the response_map dictionary as an RDBServerHostWrapper.
-
-        @param host_requests: A list of requests to acquire hosts.
-        """
-        distinct_requests = 0
-
-        logging.debug('Processing %s host acquisition requests',
-                      len(host_requests))
-        metrics.Gauge('chromeos/autotest/scheduler/pending_host_acq_requests'
-                      ).set(len(host_requests))
-
-        self.request_accountant = rdb_utils.RequestAccountant(host_requests)
-        # First pass tries to satisfy min_duts for each suite.
-        for request in self.request_accountant.requests:
-            to_acquire = self.request_accountant.get_min_duts(request)
-            if to_acquire > 0:
-                self._acquire_hosts(request, to_acquire,
-                                    is_acquire_min_duts=True)
-            distinct_requests += 1
-
-        # Second pass tries to allocate duts to the rest unsatisfied requests.
-        for request in self.request_accountant.requests:
-            to_acquire = self.request_accountant.get_duts(request)
-            if to_acquire > 0:
-                self._acquire_hosts(request, to_acquire,
-                                    is_acquire_min_duts=False)
-
-        self.cache.record_stats()
-        logging.debug('Host acquisition stats: distinct requests: %s, leased '
-                      'hosts: %s, unsatisfied requests: %s', distinct_requests,
-                      self.leased_hosts_count, self.unsatisfied_requests)
-
-
-    @metrics.SecondsTimerDecorator(_rdb_timer_name % 'batch_validate_hosts')
-    def batch_validate_hosts(self, requests):
-        """Validate requests with hosts.
-
-        Reserve all hosts, check each one for validity and discard invalid
-        request-host pairings. Lease the remaining hsots.
-
-        @param requests: A list of requests to validate.
-
-        @raises RDBException: If multiple hosts or the wrong host is returned
-            for a response.
-        """
-        # The following cases are possible for frontend requests:
-        # 1. Multiple requests for 1 host, with different acls/deps/priority:
-        #    These form distinct requests because they hash differently.
-        #    The response map will contain entries like: {r1: h1, r2: h1}
-        #    after the batch_get_hosts call. There are 2 sub-cases:
-        #        a. Same deps/acls, different priority:
-        #           Since we sort the requests based on priority, the
-        #           higher priority request r1, will lease h1. The
-        #           validation of r2, h1 will fail because of the r1 lease.
-        #        b. Different deps/acls, only one of which matches the host:
-        #           The matching request will lease h1. The other host
-        #           pairing will get dropped from the response map.
-        # 2. Multiple requests with the same acls/deps/priority and 1 host:
-        #    These all have the same request hash, so the response map will
-        #    contain: {r: h}, regardless of the number of r's. If this is not
-        #    a valid host assignment it will get dropped from the response.
-        self.batch_get_hosts(set(requests))
-        for request in sorted(self.response_map.keys(),
-                key=lambda request: request.priority, reverse=True):
-            hosts = self.response_map[request]
-            if len(hosts) > 1:
-                raise rdb_utils.RDBException('Got multiple hosts for a single '
-                        'request. Hosts: %s, request %s.' % (hosts, request))
-            # Job-shard is 1:1 mapping. Because a job can only belongs
-            # to one shard, or belongs to master, we disallow frontend job
-            # that spans hosts on and off shards or across multiple shards,
-            # which would otherwise break the 1:1 mapping.
-            # As such, on master, if a request asks for multiple hosts and
-            # if any host is found on shard, we assume other requested hosts
-            # would also be on the same shard.  We can safely drop this request.
-            ignore_request = _is_master and any(
-                    [host.shard_id for host in hosts])
-            if (not ignore_request and
-                    (self.valid_host_assignment(request, hosts[0]) and
-                        self.lease_hosts(hosts))):
-                continue
-            del self.response_map[request]
-            logging.warning('Request %s was not able to lease host %s',
-                            request, hosts[0])
-
-
-# Request dispatchers: Create the appropriate request handler, send a list
-# of requests to one of its methods. The corresponding request handler in
-# rdb_lib must understand how to match each request with a response from a
-# dispatcher, the easiest way to achieve this is to returned the response_map
-# attribute of the request handler, after making the appropriate requests.
-def get_hosts(host_requests):
-    """Get host information about the requested hosts.
-
-    @param host_requests: A list of requests as defined in BaseHostRequest.
-    @return: A dictionary mapping each request to a list of hosts.
-    """
-    rdb_handler = BaseHostRequestHandler()
-    rdb_handler.batch_get_hosts(host_requests)
-    return rdb_handler.get_response()
-
-
-def update_hosts(update_requests):
-    """Update hosts.
-
-    @param update_requests: A list of updates to host tables
-        as defined in UpdateHostRequest.
-    """
-    rdb_handler = BaseHostRequestHandler()
-    rdb_handler.update_hosts(update_requests)
-    return rdb_handler.get_response()
-
-
-def rdb_host_request_dispatcher(host_requests):
-    """Dispatcher for all host acquisition queries.
-
-    @param host_requests: A list of requests for acquiring hosts, as defined in
-        AcquireHostRequest.
-    @return: A dictionary mapping each request to a list of hosts, or
-        an empty list if none could satisfy the request. Eg:
-        {AcquireHostRequest.template: [host_info_dictionaries]}
-    """
-    validation_requests = []
-    require_hosts_requests = []
-
-    # Validation requests are made by a job scheduled against a specific host
-    # specific host (eg: through the frontend) and only require the rdb to
-    # match the parameters of the host against the request. Acquisition
-    # requests are made by jobs that need hosts (eg: suites) and the rdb needs
-    # to find hosts matching the parameters of the request.
-    for request in host_requests:
-        if request.host_id:
-            validation_requests.append(request)
-        else:
-            require_hosts_requests.append(request)
-
-    rdb_handler = AvailableHostRequestHandler()
-    rdb_handler.batch_validate_hosts(validation_requests)
-    rdb_handler.batch_acquire_hosts(require_hosts_requests)
-    return rdb_handler.get_response()
diff --git a/scheduler/rdb_cache_manager.py b/scheduler/rdb_cache_manager.py
deleted file mode 100644
index 08f9a00..0000000
--- a/scheduler/rdb_cache_manager.py
+++ /dev/null
@@ -1,341 +0,0 @@
-# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-
-"""Cache module for rdb requests/host objects.
-
-This module supplies the following api:
-    1. A cache backend.
-    2. A cache manager for the backend.
-    3. A memoize decorator to encapsulate caching logic.
-
-This cache manager functions as a lookaside buffer for host requests.
-Its correctness is contingent on the following conditions:
-1. The concurrency of the rdb remains 0.
-2. Clients of the cache don't trust the leased bit on the cached object.
-3. The cache is created at the start of a single batched request,
-    populated during the request, and completely discarded at the end.
-
-Rather than caching individual hosts, the cache manager maintains
-'cache lines'. A cache line is defined as a key: value pair, where
-the key is as returned by get_key, and the value is a list of RDBHosts
-that match the key. The following limitations are placed on cache lines:
-1. A new line can only contain unleased hosts.
-2. A key can only be set once, with a single line, before removal.
-3. Every 'get' deletes the entire line.
-
-Consider the following examples:
-Normal case: 3 grouped requests, all with the same deps/acls, but different
-priorities/parent_job_ids. The requests (X+Y+Z) > matching hosts (K):
- (request1, count=X)- hits the database, takes X hosts, caches (K-X)
- (request2, count=Y) - hits the cache and is fully satisfied, caches (K-(X+Y))
- (request3, count=Z) - hits the cache, needs to acquire (X+Y+Z)-K next tick]:
-
- Host Count |  RDB                         | Cache
-------------------------------------------------------------------
-X:          | request1                     | {}
-K:          | find_hosts(deps, acls)       |
-X:          | leased_hosts                 |
-K-X:        | ---------------------------> | {key: [K-X hosts]}
-Y<K-X:      | request2 <---[K-X hosts]---- | {}
-Y:          | leased_hosts                 |
-K-(X+Y):    | ---------------------------> | {key: [K-(X+Y) hosts]}
-Z>K-(X+Y):  | request3 <-[K-(X+Y) hosts]-- | {}
-Z-(K-(X+Y)):| leased_hosts                 |
-
-Since hosts are only released by the scheduler there is no way the
-third request could have been satisfied completely even if we had checked
-the database real-time.
-
-Stale cache entries: 3 grouped requests that don't have the same deps/acls.
-P(1,2,3) are priorities, with P3 being the highest:
- (request1(deps=[a,b], P3), Count=X) - Caches hosts
- (request2(deps=[a], P2), Count=Y) - hits the database
- (request3(deps=[a,b], P1)], Count=Z) - Tries to use cached hosts but fails
-
- Host Count |  RDB                         | Cache
-------------------------------------------------------------------
-X:          | request1(deps=[a,b])         | {}
-K:          | find_hosts(deps=[a,b])       |
-X:          | leased_hosts                 |
-K-X:        | ---------------------------> | {deps=[a,b]: [(K-X) hosts]}
-Y<K-X:      | request2(deps=[a])           | {}
-K-X:        | find_hosts(deps=[a])         |
-Y:          | leased_hosts                 |
-K-(X+Y):    | ---------------------------> | {deps=[a]: [(K-(X+Y)) hosts],
-            |                              |        | overlap |
-            |                              |  deps=[a, b], [(K-X) hosts]}
-Z:          | request3(deps=[a,b])<-[K-X]--| {deps=[a]: [K-(X+Y) hosts]}
-Z-(K-(X+Y)):| leased_hosts                 | {deps=[a]: [N-Y hosts]}
-
-Note that in the last case, even though the cache returns hosts that
-have already been assigned to request2, request3 cannot use them. This is
-acceptable because the number of hosts we lease per tick is << the number
-of requests, so it's faster to check leased bits real time than query for hosts.
-"""
-
-
-import abc
-import collections
-import logging
-
-import common
-from autotest_lib.client.common_lib import utils
-from autotest_lib.client.common_lib.global_config import global_config
-from autotest_lib.scheduler import rdb_utils
-
-try:
-    from chromite.lib import metrics
-except ImportError:
-    metrics = utils.metrics_mock
-
-
-MEMOIZE_KEY = 'memoized_hosts'
-
-def memoize_hosts(func):
-    """Decorator used to memoize through the cache manager.
-
-    @param func: The function/method to decorate.
-        Before calling this function we check the cache for values matching
-        its request argument, and anything returned by the function is cached
-        cached under the same request.
-    """
-    def cache(self, request, count, **kwargs):
-        """Caching function for the memoize decorator.
-
-        @param request: The rdb request, as defined in rdb_requests.
-        @param count: The count of elements needed to satisfy the request.
-        @param kwargs:
-            Named args for the memoized function. This map should not contain
-            the key MEMOIZED_KEY, as this is reserved for the passing of
-            the cached/memoized hosts to the function itself.
-        """
-        cache_key = self.cache.get_key(request.deps, request.acls)
-        try:
-            kwargs[MEMOIZE_KEY] = self.cache.get_line(cache_key)
-        except rdb_utils.CacheMiss:
-            pass
-        hosts = func(self, request, count, **kwargs)
-        self.cache.set_line(cache_key, hosts)
-        return hosts
-    return cache
-
-
-class CacheBackend(object):
-    """Base class for a cache backend."""
-    __metaclass__ = abc.ABCMeta
-
-    def set(self, key, value):
-        """Set a key.
-
-        @param key: The key to set.
-        @param value: The value to cache.
-        """
-        pass
-
-
-    def get(self, key):
-        """Get the value stored under a key.
-
-        @param key: The key to retrieve the value for.
-        @return: The value stored under the key.
-        @raises KeyError: If the key isn't present in the cache.
-        """
-        pass
-
-
-    def delete(self, key):
-        """Delete the key, value pair from the cache.
-
-        @param key: The key used to find the key, value pair to delete.
-        @raises KeyError: If the key isn't already in the cache.
-        """
-        pass
-
-
-    def has_key(self, key):
-        """Check if the key exists in the cache.
-
-        @param key: The key to check.
-        @return: True if the key is in the cache.
-        """
-        return False
-
-
-class DummyCacheBackend(CacheBackend):
-    """A dummy cache backend.
-
-    This cache will claim to have no keys. Every get is a cache miss.
-    """
-
-    def get(self, key):
-        raise KeyError
-
-
-class InMemoryCacheBackend(CacheBackend):
-    """In memory cache backend.
-
-    Uses a simple dictionary to store key, value pairs.
-    """
-    def __init__(self):
-        self._cache = {}
-
-    def get(self, key):
-        return self._cache[key]
-
-    def set(self, key, value):
-        self._cache[key] = value
-
-    def delete(self, key):
-        self._cache.pop(key)
-
-    def has_key(self, key):
-        return key in self._cache
-
-# TODO: Implement a MemecacheBackend, invalidate when unleasing a host, refactor
-# the AcquireHostRequest to contain a core of (deps, acls) that we can use as
-# the key for population and invalidation. The caching manager is still valid,
-# regardless of the backend.
-
-class RDBHostCacheManager(object):
-    """RDB Cache manager."""
-
-    key = collections.namedtuple('key', ['deps', 'acls'])
-    use_cache = global_config.get_config_value(
-            'RDB', 'use_cache', type=bool, default=True)
-
-    def __init__(self):
-        self._cache_backend = (InMemoryCacheBackend()
-                               if self.use_cache else DummyCacheBackend())
-        self.hits = 0
-        self.misses = 0
-        self.stale_entries = []
-
-
-    def mean_staleness(self):
-        """Compute the average stale entries per line.
-
-        @return: A floating point representing the mean staleness.
-        """
-        return (reduce(lambda x, y: float(x+y), self.stale_entries)/
-                len(self.stale_entries)) if self.stale_entries else 0
-
-
-    def hit_ratio(self):
-        """Compute the hit ratio of this cache.
-
-        @return: A floating point percentage of the hit ratio.
-        """
-        if not self.hits and not self.misses:
-            return 0
-        requests = float(self.hits + self.misses)
-        return (self.hits/requests) * 100
-
-
-    def record_stats(self):
-        """Record stats about the cache managed by this instance."""
-        hit_ratio = self.hit_ratio()
-        staleness = self.mean_staleness()
-        logging.debug('Cache stats: hit ratio: %.2f%%, '
-                      'avg staleness per line: %.2f%%.', hit_ratio, staleness)
-        metrics.Float('chromeos/autotest/scheduler/rdb/cache/hit_ratio').set(
-                hit_ratio)
-        metrics.Float(
-                'chromeos/autotest/scheduler/rdb/cache/mean_staleness').set(
-                        staleness)
-
-
-    @classmethod
-    def get_key(cls, deps, acls):
-        """Return a key for the given deps, acls.
-
-        @param deps: A list of deps, as taken by the AcquireHostRequest.
-        @param acls: A list of acls, as taken by the AcquireHostRequest.
-        @return: A cache key for the given deps/acls.
-        """
-        # All requests with the same deps, acls should hit the same cache line.
-        # TODO: Do something smarter with acls, only one needs to match.
-        return cls.key(deps=frozenset(deps), acls=frozenset(acls))
-
-
-    def get_line(self, key):
-        """Clear and return the cache line matching the key.
-
-        @param key: The key the desired cache_line is stored under.
-        @return: A list of rdb hosts matching the key, or None.
-
-        @raises rdb_utils.CacheMiss: If the key isn't in the cache.
-        """
-        try:
-            cache_line = self._cache_backend.get(key)
-        except KeyError:
-            self.misses += 1
-            raise rdb_utils.CacheMiss('Key %s not in cache' % (key,))
-        self.hits += 1
-        self._cache_backend.delete(key)
-        return list(cache_line)
-
-
-    def _check_line(self, line, key):
-        """Sanity check a cache line.
-
-        This method assumes that a cache line is made up of RDBHost objects,
-        and checks to see if they all match each other/the key passed in.
-        Checking is done in terms of host labels and acls, note that the hosts
-        in the line can have different deps/acls, as long as they all have the
-        deps required by the key, and at least one matching acl of the key.
-
-        @param line: The cache line value.
-        @param key: The key the line will be stored under.
-        @raises rdb_utils.RDBException:
-            If one of the hosts in the cache line is already leased.
-            The cache already has a different line under the given key.
-            The given key doesn't match the hosts in the line.
-        """
-        # Note that this doesn't mean that all hosts in the cache are unleased.
-        if any(host.leased for host in line):
-            raise rdb_utils.RDBException('Cannot cache leased hosts %s' % line)
-
-        # Confirm that the given line can be used to service the key by checking
-        # that all hosts have the deps mentioned in the key, and at least one
-        # matching acl.
-        h_keys = set([self.get_key(host.labels, host.acls) for host in line])
-        for h_key in h_keys:
-            if (not h_key.deps.issuperset(key.deps) or
-                    not key.acls.intersection(h_key.acls)):
-                raise rdb_utils.RDBException('Given key: %s does not match key '
-                        'computed from hosts in line: %s' % (key, h_keys))
-        if self._cache_backend.has_key(key):
-            raise rdb_utils.RDBException('Cannot override a cache line. It '
-                    'must be cleared before setting. Key: %s, hosts %s' %
-                    (key, line))
-
-
-    def set_line(self, key, hosts):
-        """Cache a list of similar hosts.
-
-        set_line will no-op if:
-            The hosts aren't all unleased.
-            The hosts don't have deps/acls matching the key.
-            A cache line under the same key already exists.
-        The first 2 cases will lead to a cache miss in the corresponding get.
-
-        @param hosts: A list of unleased hosts with the same deps/acls.
-        @raises RDBException: If hosts is None, since None is reserved for
-            key expiration.
-        """
-        if hosts is None:
-            raise rdb_utils.RDBException('Cannot set None in the cache.')
-
-        # An empty list means no hosts matching the request are available.
-        # This can happen if a previous request leased all matching hosts.
-        if not hosts or not self.use_cache:
-            self._cache_backend.set(key, [])
-            return
-        try:
-            self._check_line(hosts, key)
-        except rdb_utils.RDBException as e:
-            logging.error(e)
-        else:
-            self._cache_backend.set(key, set(hosts))
diff --git a/scheduler/rdb_hosts.py b/scheduler/rdb_hosts.py
deleted file mode 100644
index 30da146..0000000
--- a/scheduler/rdb_hosts.py
+++ /dev/null
@@ -1,419 +0,0 @@
-# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""RDB Host objects.
-
-RDBHost: Basic host object, capable of retrieving fields of a host that
-correspond to columns of the host table.
-
-RDBServerHostWrapper: Server side host adapters that help in making a raw
-database host object more ameanable to the classes and functions in the rdb
-and/or rdb clients.
-
-RDBClientHostWrapper: Scheduler host proxy that converts host information
-returned by the rdb into a client host object capable of proxying updates
-back to the rdb.
-"""
-
-import logging
-
-from django.core import exceptions as django_exceptions
-
-import common
-from autotest_lib.client.common_lib import utils
-from autotest_lib.frontend.afe import rdb_model_extensions as rdb_models
-from autotest_lib.frontend.afe import models as afe_models
-from autotest_lib.scheduler import rdb_requests
-from autotest_lib.scheduler import rdb_utils
-from autotest_lib.server import constants
-from autotest_lib.utils import labellib
-
-try:
-    from chromite.lib import metrics
-except ImportError:
-    metrics = utils.metrics_mock
-
-
-class RDBHost(object):
-    """A python host object representing a django model for the host."""
-
-    required_fields = set(
-            rdb_models.AbstractHostModel.get_basic_field_names() + ['id'])
-
-
-    def _update_attributes(self, new_attributes):
-        """Updates attributes based on an input dictionary.
-
-        Since reads are not proxied to the rdb this method caches updates to
-        the host tables as class attributes.
-
-        @param new_attributes: A dictionary of attributes to update.
-        """
-        for name, value in new_attributes.iteritems():
-            setattr(self, name, value)
-
-
-    def __init__(self, **kwargs):
-        if self.required_fields - set(kwargs.keys()):
-            raise rdb_utils.RDBException('Creating %s requires %s, got %s '
-                    % (self.__class__, self.required_fields, kwargs.keys()))
-        self._update_attributes(kwargs)
-
-
-    @classmethod
-    def get_required_fields_from_host(cls, host):
-        """Returns all required attributes of the host parsed into a dict.
-
-        Required attributes are defined as the attributes required to
-        create an RDBHost, and mirror the columns of the host table.
-
-        @param host: A host object containing all required fields as attributes.
-        """
-        required_fields_map = {}
-        try:
-            for field in cls.required_fields:
-                required_fields_map[field] = getattr(host, field)
-        except AttributeError as e:
-            raise rdb_utils.RDBException('Required %s' % e)
-        required_fields_map['id'] = host.id
-        return required_fields_map
-
-
-    def wire_format(self):
-        """Returns information about this host object.
-
-        @return: A dictionary of fields representing the host.
-        """
-        return RDBHost.get_required_fields_from_host(self)
-
-
-class RDBServerHostWrapper(RDBHost):
-    """A host wrapper for the base host object.
-
-    This object contains all the attributes of the raw database columns,
-    and a few more that make the task of host assignment easier. It handles
-    the following duties:
-        1. Serialization of the host object and foreign keys
-        2. Conversion of label ids to label names, and retrieval of platform
-        3. Checking the leased bit/status of a host before leasing it out.
-    """
-
-    def __init__(self, host):
-        """Create an RDBServerHostWrapper.
-
-        @param host: An instance of the Host model class.
-        """
-        host_fields = RDBHost.get_required_fields_from_host(host)
-        super(RDBServerHostWrapper, self).__init__(**host_fields)
-        self.labels = rdb_utils.LabelIterator(host.labels.all())
-        self.acls = [aclgroup.id for aclgroup in host.aclgroup_set.all()]
-        self.protection = host.protection
-        platform = host.platform()
-        # Platform needs to be a method, not an attribute, for
-        # backwards compatibility with the rest of the host model.
-        self.platform_name = platform.name if platform else None
-        self.shard_id = host.shard_id
-
-
-    def refresh(self, fields=None):
-        """Refresh the attributes on this instance.
-
-        @param fields: A list of fieldnames to refresh. If None
-            all the required fields of the host are refreshed.
-
-        @raises RDBException: If refreshing a field fails.
-        """
-        # TODO: This is mainly required for cache correctness. If it turns
-        # into a bottleneck, cache host_ids instead of rdbhosts and rebuild
-        # the hosts once before leasing them out. The important part is to not
-        # trust the leased bit on a cached host.
-        fields = self.required_fields if not fields else fields
-        try:
-            refreshed_fields = afe_models.Host.objects.filter(
-                    id=self.id).values(*fields)[0]
-        except django_exceptions.FieldError as e:
-            raise rdb_utils.RDBException('Couldn\'t refresh fields %s: %s' %
-                    fields, e)
-        self._update_attributes(refreshed_fields)
-
-
-    def lease(self):
-        """Set the leased bit on the host object, and in the database.
-
-        @raises RDBException: If the host is already leased.
-        """
-        self.refresh(fields=['leased'])
-        if self.leased:
-            raise rdb_utils.RDBException('Host %s is already leased' %
-                                         self.hostname)
-        self.leased = True
-        # TODO: Avoid leaking django out of rdb.QueryManagers. This is still
-        # preferable to calling save() on the host object because we're only
-        # updating/refreshing a single indexed attribute, the leased bit.
-        afe_models.Host.objects.filter(id=self.id).update(leased=self.leased)
-
-
-    def wire_format(self, unwrap_foreign_keys=True):
-        """Returns all information needed to scheduler jobs on the host.
-
-        @param unwrap_foreign_keys: If true this method will retrieve and
-            serialize foreign keys of the original host, which are stored
-            in the RDBServerHostWrapper as iterators.
-
-        @return: A dictionary of host information.
-        """
-        host_info = super(RDBServerHostWrapper, self).wire_format()
-
-        if unwrap_foreign_keys:
-            host_info['labels'] = self.labels.get_label_names()
-            host_info['acls'] = self.acls
-            host_info['platform_name'] = self.platform_name
-            host_info['protection'] = self.protection
-        return host_info
-
-
-class RDBClientHostWrapper(RDBHost):
-    """A client host wrapper for the base host object.
-
-    This wrapper is used whenever the queue entry needs direct access
-    to the host.
-    """
-    # Shows more detailed status of what a DUT is doing.
-    _HOST_WORKING_METRIC = 'chromeos/autotest/dut_working'
-    # Shows which hosts are working.
-    _HOST_STATUS_METRIC = 'chromeos/autotest/dut_status'
-    # Maps duts to pools.
-    _HOST_POOL_METRIC = 'chromeos/autotest/dut_pool'
-    # Shows which scheduler machines are using a DUT.
-    _BOARD_SHARD_METRIC = 'chromeos/autotest/shard/board_presence'
-
-
-    def __init__(self, **kwargs):
-
-        # This class is designed to only check for the bare minimum
-        # attributes on a host, so if a client tries accessing an
-        # unpopulated foreign key it will result in an exception. Doing
-        # so makes it easier to add fields to the rdb host without
-        # updating all the clients.
-        super(RDBClientHostWrapper, self).__init__(**kwargs)
-
-        # TODO(beeps): Remove this once we transition to urls
-        from autotest_lib.scheduler import rdb
-        self.update_request_manager = rdb_requests.RDBRequestManager(
-                rdb_requests.UpdateHostRequest, rdb.update_hosts)
-        self.dbg_str = ''
-        self.metadata = {}
-        # We access labels for metrics generation below and it's awkward not
-        # knowing if labels were populated or not.
-        if not hasattr(self, 'labels'):
-            self.labels = ()
-
-
-
-    def _update(self, payload):
-        """Send an update to rdb, save the attributes of the payload locally.
-
-        @param: A dictionary representing 'key':value of the update required.
-
-        @raises RDBException: If the update fails.
-        """
-        logging.info('Host %s in %s updating %s through rdb on behalf of: %s ',
-                     self.hostname, self.status, payload, self.dbg_str)
-        self.update_request_manager.add_request(host_id=self.id,
-                payload=payload)
-        for response in self.update_request_manager.response():
-            if response:
-                raise rdb_utils.RDBException('Host %s unable to perform update '
-                        '%s through rdb on behalf of %s: %s',  self.hostname,
-                        payload, self.dbg_str, response)
-        super(RDBClientHostWrapper, self)._update_attributes(payload)
-
-
-    def get_metric_fields(self):
-        """Generate default set of fields to include for Monarch.
-
-        @return: Dictionary of default fields.
-        """
-        fields = {
-            'dut_host_name': self.hostname,
-            'board': self.board,
-            'model': self._model,
-        }
-
-        return fields
-
-
-    def record_pool(self, fields):
-        """Report to Monarch current pool of dut.
-
-        @param fields   Dictionary of fields to include.
-        """
-        pool = ''
-        if len(self.pools) == 1:
-            pool = self.pools[0]
-        if pool in constants.Pools.MANAGED_POOLS:
-            pool = 'managed:' + pool
-
-        metrics.String(self._HOST_POOL_METRIC,
-                       reset_after=True).set(pool, fields=fields)
-
-
-    def set_status(self, status):
-        """Proxy for setting the status of a host via the rdb.
-
-        @param status: The new status.
-        """
-        # Update elasticsearch db.
-        self._update({'status': status})
-
-        # Update Monarch.
-        fields = self.get_metric_fields()
-        self.record_pool(fields)
-        # As each device switches state, indicate that it is not in any
-        # other state.  This allows Monarch queries to avoid double counting
-        # when additional points are added by the Window Align operation.
-        host_status_metric = metrics.Boolean(
-                self._HOST_STATUS_METRIC, reset_after=True)
-        for s in rdb_models.AbstractHostModel.Status.names:
-            fields['status'] = s
-            host_status_metric.set(s == status, fields=fields)
-
-
-    def record_working_state(self, working, timestamp):
-        """Report to Monarch whether we are working or broken.
-
-        @param working    Host repair status. `True` means that the DUT
-                          is up and expected to pass tests.  `False`
-                          means the DUT has failed repair and requires
-                          manual intervention.
-        @param timestamp  Time that the status was recorded.
-        """
-        fields = self.get_metric_fields()
-        metrics.Boolean(
-                self._HOST_WORKING_METRIC, reset_after=True).set(
-                        working, fields=fields)
-        metrics.Boolean(self._BOARD_SHARD_METRIC, reset_after=True).set(
-                True,
-                fields={
-                        'board': self.board,
-                        'model': self._model,
-                },
-        )
-        self.record_pool(fields)
-
-
-    def update_field(self, fieldname, value):
-        """Proxy for updating a field on the host.
-
-        @param fieldname: The fieldname as a string.
-        @param value: The value to assign to the field.
-        """
-        self._update({fieldname: value})
-
-
-    def platform_and_labels(self):
-        """Get the platform and labels on this host.
-
-        @return: A tuple containing a list of label names and the platform name.
-        """
-        platform = self.platform_name
-        labels = [label for label in self.labels if label != platform]
-        return platform, labels
-
-
-    def platform(self):
-        """Get the name of the platform of this host.
-
-        @return: A string representing the name of the platform.
-        """
-        return self.platform_name
-
-
-    @property
-    def board(self):
-        """Get the names of the board of this host.
-
-        @return: A string of the name of the board, e.g., lumpy. Returns '' if
-                no board label is found.
-        """
-        labels = labellib.LabelsMapping(self.labels)
-        return labels.get('board', '')
-
-
-    @property
-    def _model(self):
-        """Get the model this host.
-
-        @return: A string of the name of the model, e.g., robo360. Returns '' if
-                no model label is found.
-        """
-        labels = labellib.LabelsMapping(self.labels)
-        return labels.get('model', '')
-
-
-    @property
-    def pools(self):
-        """Get the names of the pools of this host.
-
-        @return: A list of pool names that the host is assigned to.
-        """
-        return [l[len(constants.Labels.POOL_PREFIX):] for l in self.labels
-                if l.startswith(constants.Labels.POOL_PREFIX)]
-
-
-    def get_object_dict(self, **kwargs):
-        """Serialize the attributes of this object into a dict.
-
-        This method is called through frontend code to get a serialized
-        version of this object.
-
-        @param kwargs:
-            extra_fields: Extra fields, outside the columns of a host table.
-
-        @return: A dictionary representing the fields of this host object.
-        """
-        # TODO(beeps): Implement support for extra fields. Currently nothing
-        # requires them.
-        return self.wire_format()
-
-
-    def save(self):
-        """Save any local data a client of this host object might have saved.
-
-        Setting attributes on a model before calling its save() method is a
-        common django pattern. Most, if not all updates to the host happen
-        either through set status or update_field. Though we keep the internal
-        state of the RDBClientHostWrapper consistent through these updates
-        we need a bulk save method such as this one to save any attributes of
-        this host another model might have set on it before calling its own
-        save method. Eg:
-            task = ST.objects.get(id=12)
-            task.host.status = 'Running'
-            task.save() -> this should result in the hosts status changing to
-            Running.
-
-        Functions like add_host_to_labels will have to update this host object
-        differently, as that is another level of foreign key indirection.
-        """
-        self._update(self.get_required_fields_from_host(self))
-
-
-def return_rdb_host(func):
-    """Decorator for functions that return a list of Host objects.
-
-    @param func: The decorated function.
-    @return: A functions capable of converting each host_object to a
-        rdb_hosts.RDBServerHostWrapper.
-    """
-    def get_rdb_host(*args, **kwargs):
-        """Takes a list of hosts and returns a list of host_infos.
-
-        @param hosts: A list of hosts. Each host is assumed to contain
-            all the fields in a host_info defined above.
-        @return: A list of rdb_hosts.RDBServerHostWrappers, one per host, or an
-            empty list is no hosts were found..
-        """
-        hosts = func(*args, **kwargs)
-        return [RDBServerHostWrapper(host) for host in hosts]
-    return get_rdb_host
diff --git a/scheduler/rdb_lib.py b/scheduler/rdb_lib.py
deleted file mode 100644
index 8f0ee73..0000000
--- a/scheduler/rdb_lib.py
+++ /dev/null
@@ -1,111 +0,0 @@
-# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""This module manages translation between monitor_db and the rdb. """
-
-import common
-from autotest_lib.scheduler import rdb
-from autotest_lib.scheduler import rdb_hosts
-from autotest_lib.scheduler import rdb_requests
-from autotest_lib.server.cros import provision
-
-
-# Adapters for scheduler specific objects: Convert job information to a
-# format more ameanable to the rdb/rdb request managers.
-class JobQueryManager(object):
-    """A caching query manager for all job related information."""
-    def __init__(self, queue_entries, suite_min_duts=None):
-        """Initialize.
-
-        @param queue_entries: A list of HostQueueEntry objects.
-        @param suite_min_duts: A dictionary where the key is suite job id,
-                and the value is the value of 'suite_min_dut' in the suite's
-                job keyvals. It should cover all the suite jobs which
-                the jobs (associated with the queue_entries) belong to.
-        """
-        # TODO(beeps): Break this dependency on the host_query_manager,
-        # crbug.com/336934.
-        from autotest_lib.scheduler import query_managers
-        self.query_manager = query_managers.AFEHostQueryManager()
-        jobs = [queue_entry.job_id for queue_entry in queue_entries]
-        self._job_acls = self.query_manager._get_job_acl_groups(jobs)
-        self._job_deps = self.query_manager._get_job_dependencies(jobs)
-        self._labels = self.query_manager._get_labels(self._job_deps)
-        self._suite_min_duts = suite_min_duts or {}
-
-
-    def get_job_info(self, queue_entry):
-        """Extract job information from a queue_entry/host-scheduler.
-
-        @param queue_entry: The queue_entry for which we need job information.
-
-        @return: A dictionary representing job related information.
-        """
-        job_id = queue_entry.job_id
-        job_deps, job_preferred_deps = [], []
-        for dep in self._job_deps.get(job_id, []):
-            if not provision.is_for_special_action(self._labels[dep].name):
-                job_deps.append(dep)
-            elif provision.Provision.acts_on(self._labels[dep].name):
-                job_preferred_deps.append(dep)
-
-        job_acls = self._job_acls.get(job_id, [])
-        parent_id = queue_entry.job.parent_job_id
-        min_duts = self._suite_min_duts.get(parent_id, 0) if parent_id else 0
-
-        return {'deps': job_deps, 'acls': job_acls,
-                'preferred_deps': job_preferred_deps,
-                'host_id': queue_entry.host_id,
-                'parent_job_id': queue_entry.job.parent_job_id,
-                'priority': queue_entry.job.priority,
-                'suite_min_duts': min_duts}
-
-
-def acquire_hosts(queue_entries, suite_min_duts=None):
-    """Acquire hosts for the list of queue_entries.
-
-    The act of acquisition involves leasing a host from the rdb.
-
-    @param queue_entries: A list of queue_entries that need hosts.
-    @param suite_min_duts: A dictionary that maps suite job id to the minimum
-                           number of duts required.
-
-    @yield: An rdb_hosts.RDBClientHostWrapper for each host acquired on behalf
-        of a queue_entry, or None if a host wasn't found.
-
-    @raises RDBException: If something goes wrong making the request.
-    """
-    job_query_manager = JobQueryManager(queue_entries, suite_min_duts)
-    request_manager = rdb_requests.BaseHostRequestManager(
-            rdb_requests.AcquireHostRequest, rdb.rdb_host_request_dispatcher)
-    for entry in queue_entries:
-        request_manager.add_request(**job_query_manager.get_job_info(entry))
-
-    for host in request_manager.response():
-        yield (rdb_hosts.RDBClientHostWrapper(**host)
-               if host else None)
-
-
-def get_hosts(host_ids):
-    """Get information about the hosts with ids in host_ids.
-
-    get_hosts is different from acquire_hosts in that it is completely
-    oblivious to the leased state of a host.
-
-    @param host_ids: A list of host_ids.
-
-    @return: A list of rdb_hosts.RDBClientHostWrapper objects.
-
-    @raises RDBException: If something goes wrong in making the request.
-    """
-    request_manager = rdb_requests.BaseHostRequestManager(
-            rdb_requests.HostRequest, rdb.get_hosts)
-    for host_id in host_ids:
-        request_manager.add_request(host_id=host_id)
-
-    hosts = []
-    for host in request_manager.response():
-        hosts.append(rdb_hosts.RDBClientHostWrapper(**host)
-                     if host else None)
-    return hosts
diff --git a/scheduler/rdb_requests.py b/scheduler/rdb_requests.py
deleted file mode 100644
index 21ad716..0000000
--- a/scheduler/rdb_requests.py
+++ /dev/null
@@ -1,214 +0,0 @@
-# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""RDB request managers and requests.
-
-RDB request managers: Call an rdb api_method with a list of RDBRequests, and
-match the requests to the responses returned.
-
-RDB Request classes: Used in conjunction with the request managers. Each class
-defines the set of fields the rdb needs to fulfill the request, and a hashable
-request object the request managers use to identify a response with a request.
-"""
-
-import collections
-
-import common
-from autotest_lib.scheduler import rdb_utils
-
-
-class RDBRequestManager(object):
-    """Base request manager for RDB requests.
-
-    Each instance of a request manager is associated with one request, and
-    one api call. All subclasses maintain a queue of unexecuted requests, and
-    and expose an api to add requests/retrieve the response for these requests.
-    """
-
-
-    def __init__(self, request, api_call):
-        """
-        @param request: A subclass of rdb_utls.RDBRequest. The manager can only
-            manage requests of one type.
-        @param api_call: The rdb api call this manager is expected to make.
-            A manager can only send requests of type request, to this api call.
-        """
-        self.request = request
-        self.api_call = api_call
-        self.request_queue = []
-
-
-    def add_request(self, **kwargs):
-        """Add an RDBRequest to the queue."""
-        self.request_queue.append(self.request(**kwargs).get_request())
-
-
-    def response(self):
-        """Execute the api call and return a response for each request.
-
-        The order of responses is the same as the order of requests added
-        to the queue.
-
-        @yield: A response for each request added to the queue after the
-            last invocation of response.
-        """
-        if not self.request_queue:
-            raise rdb_utils.RDBException('No requests. Call add_requests '
-                    'with the appropriate kwargs, before calling response.')
-
-        result = self.api_call(self.request_queue)
-        requests = self.request_queue
-        self.request_queue = []
-        for request in requests:
-            yield result.get(request) if result else None
-
-
-class BaseHostRequestManager(RDBRequestManager):
-    """Manager for batched get requests on hosts."""
-
-
-    def response(self):
-        """Yields a popped host from the returned host list."""
-
-        # As a side-effect of returning a host, this method also removes it
-        # from the list of hosts matched up against a request. Eg:
-        #    hqes: [hqe1, hqe2, hqe3]
-        #    client requests: [c_r1, c_r2, c_r3]
-        #    generate requests in rdb: [r1 (c_r1 and c_r2), r2]
-        #    and response {r1: [h1, h2], r2:[h3]}
-        # c_r1 and c_r2 need to get different hosts though they're the same
-        # request, because they're from different queue_entries.
-        for hosts in super(BaseHostRequestManager, self).response():
-            yield hosts.pop() if hosts else None
-
-
-class RDBRequestMeta(type):
-    """Metaclass for constructing rdb requests.
-
-    This meta class creates a read-only request template by combining the
-    request_arguments of all classes in the inheritence hierarchy into a
-    namedtuple.
-    """
-    def __new__(cls, name, bases, dctn):
-        for base in bases:
-            try:
-                dctn['_request_args'].update(base._request_args)
-            except AttributeError:
-                pass
-        dctn['template'] = collections.namedtuple('template',
-                                                  dctn['_request_args'])
-        return type.__new__(cls, name, bases, dctn)
-
-
-class RDBRequest(object):
-    """Base class for an rdb request.
-
-    All classes inheriting from RDBRequest will need to specify a list of
-    request_args necessary to create the request, and will in turn get a
-    request that the rdb understands.
-    """
-    __metaclass__ = RDBRequestMeta
-    __slots__ = set(['_request_args', '_request'])
-    _request_args = set([])
-
-
-    def __init__(self, **kwargs):
-        for key,value in kwargs.iteritems():
-            try:
-                hash(value)
-            except TypeError as e:
-                raise rdb_utils.RDBException('All fields of a %s must be. '
-                        'hashable %s: %s, %s failed this test.' %
-                        (self.__class__, key, type(value), value))
-        try:
-            self._request = self.template(**kwargs)
-        except TypeError:
-            raise rdb_utils.RDBException('Creating %s requires args %s got %s' %
-                    (self.__class__, self.template._fields, kwargs.keys()))
-
-
-    def get_request(self):
-        """Returns a request that the rdb understands.
-
-        @return: A named tuple with all the fields necessary to make a request.
-        """
-        return self._request
-
-
-class HashableDict(dict):
-    """A hashable dictionary.
-
-    This class assumes all values of the input dict are hashable.
-    """
-
-    def __hash__(self):
-        return hash(tuple(sorted(self.items())))
-
-
-class HostRequest(RDBRequest):
-    """Basic request for information about a single host.
-
-    Eg: HostRequest(host_id=x): Will return all information about host x.
-    """
-    _request_args =  set(['host_id'])
-
-
-class UpdateHostRequest(HostRequest):
-    """Defines requests to update hosts.
-
-    Eg:
-        UpdateHostRequest(host_id=x, payload={'afe_hosts_col_name': value}):
-            Will update column afe_hosts_col_name with the given value, for
-            the given host_id.
-
-    @raises RDBException: If the input arguments don't contain the expected
-        fields to make the request, or are of the wrong type.
-    """
-    _request_args = set(['payload'])
-
-
-    def __init__(self, **kwargs):
-        try:
-            kwargs['payload'] = HashableDict(kwargs['payload'])
-        except (KeyError, TypeError) as e:
-            raise rdb_utils.RDBException('Creating %s requires args %s got %s' %
-                    (self.__class__, self.template._fields, kwargs.keys()))
-        super(UpdateHostRequest, self).__init__(**kwargs)
-
-
-class AcquireHostRequest(HostRequest):
-    """Defines requests to acquire hosts.
-
-    Eg:
-        AcquireHostRequest(host_id=None, deps=[d1, d2], acls=[a1, a2],
-                priority=None, parent_job_id=None): Will acquire and return a
-                host that matches the specified deps/acls.
-        AcquireHostRequest(host_id=x, deps=[d1, d2], acls=[a1, a2]) : Will
-            acquire and return host x, after checking deps/acls match.
-
-    @raises RDBException: If the the input arguments don't contain the expected
-        fields to make a request, or are of the wrong type.
-    """
-    # TODO(beeps): Priority and parent_job_id shouldn't be a part of the
-    # core request.
-    _request_args = set(['priority', 'deps', 'preferred_deps', 'acls',
-                         'parent_job_id', 'suite_min_duts'])
-
-
-    def __init__(self, **kwargs):
-        try:
-            kwargs['deps'] = frozenset(kwargs['deps'])
-            kwargs['preferred_deps'] = frozenset(kwargs['preferred_deps'])
-            kwargs['acls'] = frozenset(kwargs['acls'])
-
-            # parent_job_id defaults to NULL but always serializing it as an int
-            # fits the rdb's type assumptions. Note that job ids are 1 based.
-            if kwargs['parent_job_id'] is None:
-                kwargs['parent_job_id'] = 0
-        except (KeyError, TypeError) as e:
-            raise rdb_utils.RDBException('Creating %s requires args %s got %s' %
-                    (self.__class__, self.template._fields, kwargs.keys()))
-        super(AcquireHostRequest, self).__init__(**kwargs)
-
-
diff --git a/scheduler/rdb_utils.py b/scheduler/rdb_utils.py
deleted file mode 100644
index 6b952e2..0000000
--- a/scheduler/rdb_utils.py
+++ /dev/null
@@ -1,186 +0,0 @@
-# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""RDB utilities.
-
-Do not import rdb or autotest modules here to avoid cyclic dependencies.
-"""
-
-import collections
-
-import common
-from autotest_lib.client.common_lib import priorities
-from autotest_lib.client.common_lib import utils
-
-try:
-    from chromite.lib import metrics
-except ImportError:
-    metrics = utils.metrics_mock
-
-
-RDB_STATS_KEY = 'rdb'
-
-class RDBException(Exception):
-    """Generic RDB exception."""
-
-    def wire_format(self, **kwargs):
-        """Convert the exception to a format better suited to an rpc response.
-        """
-        return str(self)
-
-
-class CacheMiss(RDBException):
-    """Generic exception raised for a cache miss in the rdb."""
-    pass
-
-
-class LabelIterator(object):
-    """An Iterator for labels.
-
-    Within the rdb any label/dependency comparisons are performed based on label
-    ids. However, the host object returned needs to contain label names instead.
-    This class returns label ids for iteration, but a list of all label names
-    when accessed through get_label_names.
-    """
-
-    def __init__(self, labels):
-        self.labels = labels
-
-
-    def __iter__(self):
-        return iter(label.id for label in self.labels)
-
-
-    def get_label_names(self):
-        """Get all label names of the labels associated with this class.
-
-        @return: A list of label names.
-        """
-        return [label.name for label in self.labels]
-
-
-class RequestAccountant(object):
-    """A helper class that count requests and manages min_duts requirement.
-
-    On initialization, this object takes a list of host requests.
-    It will batch the requests by grouping similar requests together
-    and generate a mapping from unique request-> count of the request.
-    It will also generates a mapping from suite_job_id -> min_duts.
-
-    RDB does a two-round of host aquisition. The first round attempts
-    to get min_duts for each suite. The second round attemps to satisfy
-    the rest requests.  RDB calls get_min_duts and get_rest to
-    figure out how many duts it should attempt to get for a unique
-    request in the first and second round respectively.
-
-    Assume we have two distinct requests
-          R1 (parent_job_id: 10, need hosts: 2)
-          R2 (parent_job_id: 10, need hosts: 4)
-    And parent job P (job_id:10) has min dut requirement of 3. So we got
-          requests_to_counts = {R1: 2, R2: 4}
-          min_duts_map = {P: 3}
-
-    First round acquiring:
-    Call get_min_duts(R1)
-          return 2, because P hasn't reach its min dut limit (3) yet
-          requests_to_counts -> {R1: 2-2=0, R2: 4}
-          min_duts_map -> {P: 3-2=1}
-
-    Call get_min_duts(R2)
-         return 1, because although R2 needs 4 duts, P's min dut limit is now 1
-          requests_to_counts -> {R1: 0, R2: 4-1=3}
-          min_duts_map -> {P: 1-1=0}
-
-    Second round acquiring:
-    Call get_rest(R1):
-         return 0, requests_to_counts[R1]
-    Call get_rest(R2):
-         return 3, requests_to_counts[R2]
-
-    Note it is possible that in the first round acquiring, although
-    R1 requested 2 duts, it may only get 1 or None. However get_rest
-    doesn't need to care whether the first round succeeded or not, as
-    in the case when the first round failed, regardless how many duts
-    get_rest requests, it will not be fullfilled anyway.
-    """
-
-    _host_ratio_metric = metrics.Float(
-            'chromeos/autotest/scheduler/rdb/host_acquisition_ratio')
-
-
-    def __init__(self, host_requests):
-        """Initialize.
-
-        @param host_requests: A list of request to acquire hosts.
-        """
-        self.requests_to_counts = {}
-        # The order matters, it determines which request got fullfilled first.
-        self.requests = []
-        for request, count in self._batch_requests(host_requests):
-            self.requests.append(request)
-            self.requests_to_counts[request] = count
-        self.min_duts_map = dict(
-                (r.parent_job_id, r.suite_min_duts)
-                for r in self.requests_to_counts.iterkeys() if r.parent_job_id)
-
-
-    @classmethod
-    def _batch_requests(cls, requests):
-        """ Group similar requests, sort by priority and parent_job_id.
-
-        @param requests: A list or unsorted, unordered requests.
-
-        @return: A list of tuples of the form (request, number of occurances)
-            formed by counting the number of requests with the same acls/deps/
-            priority in the input list of requests, and sorting by priority.
-            The order of this list ensures against priority inversion.
-        """
-        sort_function = lambda request: (request[0].priority,
-                                         -request[0].parent_job_id)
-        return sorted(collections.Counter(requests).items(), key=sort_function,
-                      reverse=True)
-
-
-    def get_min_duts(self, host_request):
-        """Given a distinct host request figure out min duts to request for.
-
-        @param host_request: A request.
-        @returns: The minimum duts that should be requested.
-        """
-        parent_id = host_request.parent_job_id
-        count = self.requests_to_counts[host_request]
-        if parent_id:
-            min_duts = self.min_duts_map.get(parent_id, 0)
-            to_acquire = min(count, min_duts)
-            self.min_duts_map[parent_id] = max(0, min_duts - to_acquire)
-        else:
-            to_acquire = 0
-        self.requests_to_counts[host_request] -= to_acquire
-        return to_acquire
-
-
-    def get_duts(self, host_request):
-        """Return the number of duts host_request still need.
-
-        @param host_request: A request.
-        @returns: The number of duts need to be requested.
-        """
-        return self.requests_to_counts[host_request]
-
-
-    # Note: Possibly this code is dead, see crbug.com/738508 for
-    # context.
-    def record_acquire_min_duts(cls, host_request, hosts_required,
-                                acquired_host_count):
-        """Send stats about host acquisition.
-
-        @param host_request: A request.
-        @param hosts_required: Number of hosts required to satisfy request.
-        @param acquired_host_count: Number of host acquired.
-        """
-        try:
-            priority =  priorities.Priority.get_string(host_request.priority)
-        except ValueError:
-            return
-        cls._host_ratio_metric.set(acquired_host_count/float(hosts_required))
diff --git a/scheduler/scheduler_models.py b/scheduler/scheduler_models.py
deleted file mode 100644
index b968fab..0000000
--- a/scheduler/scheduler_models.py
+++ /dev/null
@@ -1,1363 +0,0 @@
-# pylint: disable=missing-docstring
-
-"""Database model classes for the scheduler.
-
-Contains model classes abstracting the various DB tables used by the scheduler.
-These overlap the Django models in basic functionality, but were written before
-the Django models existed and have not yet been phased out.  Some of them
-(particularly HostQueueEntry and Job) have considerable scheduler-specific logic
-which would probably be ill-suited for inclusion in the general Django model
-classes.
-
-Globals:
-_notify_email_statuses: list of HQE statuses.  each time a single HQE reaches
-        one of these statuses, an email will be sent to the job's email_list.
-        comes from global_config.
-_base_url: URL to the local AFE server, used to construct URLs for emails.
-_db: DatabaseConnection for this module.
-_drone_manager: reference to global DroneManager instance.
-"""
-
-import base64
-import datetime
-import errno
-import itertools
-import logging
-import re
-import weakref
-
-import google.protobuf.internal.well_known_types as types
-
-from autotest_lib.client.common_lib import global_config, host_protections
-from autotest_lib.client.common_lib import time_utils
-from autotest_lib.client.common_lib import utils
-from autotest_lib.frontend.afe import models, model_attributes
-from autotest_lib.scheduler import drone_manager
-from autotest_lib.scheduler import rdb_lib
-from autotest_lib.scheduler import scheduler_config
-from autotest_lib.scheduler import scheduler_lib
-from autotest_lib.server import afe_urls
-from autotest_lib.server.cros import provision
-
-try:
-    from chromite.lib import metrics
-    from chromite.lib import cloud_trace
-except ImportError:
-    metrics = utils.metrics_mock
-    import mock
-    cloud_trace = mock.Mock()
-
-
-_notify_email_statuses = []
-_base_url = None
-
-_db = None
-_drone_manager = None
-
-RESPECT_STATIC_LABELS = global_config.global_config.get_config_value(
-        'SKYLAB', 'respect_static_labels', type=bool, default=False)
-
-
-def initialize():
-    global _db
-    _db = scheduler_lib.ConnectionManager().get_connection()
-
-    notify_statuses_list = global_config.global_config.get_config_value(
-            scheduler_config.CONFIG_SECTION, "notify_email_statuses",
-            default='')
-    global _notify_email_statuses
-    _notify_email_statuses = [status for status in
-                              re.split(r'[\s,;:]', notify_statuses_list.lower())
-                              if status]
-
-    # AUTOTEST_WEB.base_url is still a supported config option as some people
-    # may wish to override the entire url.
-    global _base_url
-    config_base_url = global_config.global_config.get_config_value(
-            scheduler_config.CONFIG_SECTION, 'base_url', default='')
-    if config_base_url:
-        _base_url = config_base_url
-    else:
-        _base_url = afe_urls.ROOT_URL
-
-    initialize_globals()
-
-
-def initialize_globals():
-    global _drone_manager
-    _drone_manager = drone_manager.instance()
-
-
-def get_job_metadata(job):
-    """Get a dictionary of the job information.
-
-    The return value is a dictionary that includes job information like id,
-    name and parent job information. The value will be stored in metadata
-    database.
-
-    @param job: A Job object.
-    @return: A dictionary containing the job id, owner and name.
-    """
-    if not job:
-        logging.error('Job is None, no metadata returned.')
-        return {}
-    try:
-        return {'job_id': job.id,
-                'owner': job.owner,
-                'job_name': job.name,
-                'parent_job_id': job.parent_job_id}
-    except AttributeError as e:
-        logging.error('Job has missing attribute: %s', e)
-        return {}
-
-
-class DBError(Exception):
-    """Raised by the DBObject constructor when its select fails."""
-
-
-class DBObject(object):
-    """A miniature object relational model for the database."""
-
-    # Subclasses MUST override these:
-    _table_name = ''
-    _fields = ()
-
-    # A mapping from (type, id) to the instance of the object for that
-    # particular id.  This prevents us from creating new Job() and Host()
-    # instances for every HostQueueEntry object that we instantiate as
-    # multiple HQEs often share the same Job.
-    _instances_by_type_and_id = weakref.WeakValueDictionary()
-    _initialized = False
-
-
-    def __new__(cls, id=None, **kwargs):
-        """
-        Look to see if we already have an instance for this particular type
-        and id.  If so, use it instead of creating a duplicate instance.
-        """
-        if id is not None:
-            instance = cls._instances_by_type_and_id.get((cls, id))
-            if instance:
-                return instance
-        return super(DBObject, cls).__new__(cls, id=id, **kwargs)
-
-
-    def __init__(self, id=None, row=None, new_record=False, always_query=True):
-        assert bool(id) or bool(row)
-        if id is not None and row is not None:
-            assert id == row[0]
-        assert self._table_name, '_table_name must be defined in your class'
-        assert self._fields, '_fields must be defined in your class'
-        if not new_record:
-            if self._initialized and not always_query:
-                return  # We've already been initialized.
-            if id is None:
-                id = row[0]
-            # Tell future constructors to use us instead of re-querying while
-            # this instance is still around.
-            self._instances_by_type_and_id[(type(self), id)] = self
-
-        self.__table = self._table_name
-
-        self.__new_record = new_record
-
-        if row is None:
-            row = self._fetch_row_from_db(id)
-
-        if self._initialized:
-            differences = self._compare_fields_in_row(row)
-            if differences:
-                logging.warning(
-                    'initialized %s %s instance requery is updating: %s',
-                    type(self), self.id, differences)
-        self._update_fields_from_row(row)
-        self._initialized = True
-
-
-    @classmethod
-    def _clear_instance_cache(cls):
-        """Used for testing, clear the internal instance cache."""
-        cls._instances_by_type_and_id.clear()
-
-
-    def _fetch_row_from_db(self, row_id):
-        fields = ', '.join(self._fields)
-        sql = 'SELECT %s FROM %s WHERE ID=%%s' % (fields, self.__table)
-        rows = _db.execute(sql, (row_id,))
-        if not rows:
-            raise DBError("row not found (table=%s, row id=%s)"
-                          % (self.__table, row_id))
-        return rows[0]
-
-
-    def _assert_row_length(self, row):
-        assert len(row) == len(self._fields), (
-            "table = %s, row = %s/%d, fields = %s/%d" % (
-            self.__table, row, len(row), self._fields, len(self._fields)))
-
-
-    def _compare_fields_in_row(self, row):
-        """
-        Given a row as returned by a SELECT query, compare it to our existing in
-        memory fields.  Fractional seconds are stripped from datetime values
-        before comparison.
-
-        @param row - A sequence of values corresponding to fields named in
-                The class attribute _fields.
-
-        @returns A dictionary listing the differences keyed by field name
-                containing tuples of (current_value, row_value).
-        """
-        self._assert_row_length(row)
-        differences = {}
-        for field, row_value in itertools.izip(self._fields, row):
-            current_value = getattr(self, field)
-            if (isinstance(current_value, datetime.datetime)
-                and isinstance(row_value, datetime.datetime)):
-                current_value = current_value.strftime(time_utils.TIME_FMT)
-                row_value = row_value.strftime(time_utils.TIME_FMT)
-            if current_value != row_value:
-                differences[field] = (current_value, row_value)
-        return differences
-
-
-    def _update_fields_from_row(self, row):
-        """
-        Update our field attributes using a single row returned by SELECT.
-
-        @param row - A sequence of values corresponding to fields named in
-                the class fields list.
-        """
-        self._assert_row_length(row)
-
-        self._valid_fields = set()
-        for field, value in itertools.izip(self._fields, row):
-            setattr(self, field, value)
-            self._valid_fields.add(field)
-
-        self._valid_fields.remove('id')
-
-
-    def update_from_database(self):
-        assert self.id is not None
-        row = self._fetch_row_from_db(self.id)
-        self._update_fields_from_row(row)
-
-
-    def count(self, where, table = None):
-        if not table:
-            table = self.__table
-
-        rows = _db.execute("""
-                SELECT count(*) FROM %s
-                WHERE %s
-        """ % (table, where))
-
-        assert len(rows) == 1
-
-        return int(rows[0][0])
-
-
-    def update_field(self, field, value):
-        assert field in self._valid_fields
-
-        if getattr(self, field) == value:
-            return
-
-        query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
-        _db.execute(query, (value, self.id))
-
-        setattr(self, field, value)
-
-
-    def save(self):
-        if self.__new_record:
-            keys = self._fields[1:] # avoid id
-            columns = ','.join([str(key) for key in keys])
-            values = []
-            for key in keys:
-                value = getattr(self, key)
-                if value is None:
-                    values.append('NULL')
-                else:
-                    values.append('"%s"' % value)
-            values_str = ','.join(values)
-            query = ('INSERT INTO %s (%s) VALUES (%s)' %
-                     (self.__table, columns, values_str))
-            _db.execute(query)
-            # Update our id to the one the database just assigned to us.
-            self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
-
-
-    def delete(self):
-        self._instances_by_type_and_id.pop((type(self), id), None)
-        self._initialized = False
-        self._valid_fields.clear()
-        query = 'DELETE FROM %s WHERE id=%%s' % self.__table
-        _db.execute(query, (self.id,))
-
-
-    @staticmethod
-    def _prefix_with(string, prefix):
-        if string:
-            string = prefix + string
-        return string
-
-
-    @classmethod
-    def fetch_rows(cls, where='', params=(), joins='', order_by=''):
-        """
-        Fetch the rows based on the given database query.
-
-        @yields the rows fetched by the given query.
-        """
-        order_by = cls._prefix_with(order_by, 'ORDER BY ')
-        where = cls._prefix_with(where, 'WHERE ')
-        fields = []
-        for field in cls._fields:
-            fields.append('%s.%s' % (cls._table_name, field))
-
-        query = ('SELECT %(fields)s FROM %(table)s %(joins)s '
-                 '%(where)s %(order_by)s' % {'fields' : ', '.join(fields),
-                                             'table' : cls._table_name,
-                                             'joins' : joins,
-                                             'where' : where,
-                                             'order_by' : order_by})
-        rows = _db.execute(query, params)
-        return rows
-
-    @classmethod
-    def fetch(cls, where='', params=(), joins='', order_by=''):
-        """
-        Construct instances of our class based on the given database query.
-
-        @yields One class instance for each row fetched.
-        """
-        rows = cls.fetch_rows(where=where, params=params, joins=joins,
-                              order_by=order_by)
-        return [cls(id=row[0], row=row) for row in rows]
-
-
-class IneligibleHostQueue(DBObject):
-    _table_name = 'afe_ineligible_host_queues'
-    _fields = ('id', 'job_id', 'host_id')
-
-
-class AtomicGroup(DBObject):
-    _table_name = 'afe_atomic_groups'
-    _fields = ('id', 'name', 'description', 'max_number_of_machines',
-               'invalid')
-
-
-class Label(DBObject):
-    _table_name = 'afe_labels'
-    _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
-               'only_if_needed', 'atomic_group_id')
-
-
-    def __repr__(self):
-        return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
-                self.name, self.id, self.atomic_group_id)
-
-
-class Host(DBObject):
-    _table_name = 'afe_hosts'
-    # TODO(ayatane): synch_id is not used, remove after fixing DB.
-    _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
-               'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty',
-               'leased', 'shard_id', 'lock_reason')
-
-
-    def set_status(self,status):
-        logging.info('%s -> %s', self.hostname, status)
-        self.update_field('status',status)
-
-
-    def _get_labels_with_platform(self, non_static_rows, static_rows):
-        """Helper function to fetch labels & platform for a host."""
-        if not RESPECT_STATIC_LABELS:
-            return non_static_rows
-
-        combined_rows = []
-        replaced_labels = _db.execute(
-                'SELECT label_id FROM afe_replaced_labels')
-        replaced_label_ids = {l[0] for l in replaced_labels}
-
-        # We respect afe_labels more, which means:
-        #   * if non-static labels are replaced, we find its replaced static
-        #   labels from afe_static_labels by label name.
-        #   * if non-static labels are not replaced, we keep it.
-        #   * Drop static labels which don't have reference non-static labels.
-        static_label_names = []
-        for label_id, label_name, is_platform in non_static_rows:
-            if label_id not in replaced_label_ids:
-                combined_rows.append((label_id, label_name, is_platform))
-            else:
-                static_label_names.append(label_name)
-
-        # Only keep static labels who have replaced non-static labels.
-        for label_id, label_name, is_platform in static_rows:
-            if label_name in static_label_names:
-                combined_rows.append((label_id, label_name, is_platform))
-
-        return combined_rows
-
-
-    def platform_and_labels(self):
-        """
-        Returns a tuple (platform_name, list_of_all_label_names).
-        """
-        template = ('SELECT %(label_table)s.id, %(label_table)s.name, '
-                    '%(label_table)s.platform FROM %(label_table)s INNER '
-                    'JOIN %(host_label_table)s '
-                    'ON %(label_table)s.id = %(host_label_table)s.%(column)s '
-                    'WHERE %(host_label_table)s.host_id = %(host_id)s '
-                    'ORDER BY %(label_table)s.name')
-        static_query = template % {
-                'host_label_table': 'afe_static_hosts_labels',
-                'label_table': 'afe_static_labels',
-                'column': 'staticlabel_id',
-                'host_id': self.id
-        }
-        non_static_query = template % {
-                'host_label_table': 'afe_hosts_labels',
-                'label_table': 'afe_labels',
-                'column': 'label_id',
-                'host_id': self.id
-        }
-        non_static_rows = _db.execute(non_static_query)
-        static_rows = _db.execute(static_query)
-
-        rows = self._get_labels_with_platform(non_static_rows, static_rows)
-        platform = None
-        all_labels = []
-        for _, label_name, is_platform in rows:
-            if is_platform:
-                platform = label_name
-            all_labels.append(label_name)
-        return platform, all_labels
-
-
-    _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
-
-
-    @classmethod
-    def cmp_for_sort(cls, a, b):
-        """
-        A comparison function for sorting Host objects by hostname.
-
-        This strips any trailing numeric digits, ignores leading 0s and
-        compares hostnames by the leading name and the trailing digits as a
-        number.  If both hostnames do not match this pattern, they are simply
-        compared as lower case strings.
-
-        Example of how hostnames will be sorted:
-
-          alice, host1, host2, host09, host010, host10, host11, yolkfolk
-
-        This hopefully satisfy most people's hostname sorting needs regardless
-        of their exact naming schemes.  Nobody sane should have both a host10
-        and host010 (but the algorithm works regardless).
-        """
-        lower_a = a.hostname.lower()
-        lower_b = b.hostname.lower()
-        match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
-        match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
-        if match_a and match_b:
-            name_a, number_a_str = match_a.groups()
-            name_b, number_b_str = match_b.groups()
-            number_a = int(number_a_str.lstrip('0'))
-            number_b = int(number_b_str.lstrip('0'))
-            result = cmp((name_a, number_a), (name_b, number_b))
-            if result == 0 and lower_a != lower_b:
-                # If they compared equal above but the lower case names are
-                # indeed different, don't report equality.  abc012 != abc12.
-                return cmp(lower_a, lower_b)
-            return result
-        else:
-            return cmp(lower_a, lower_b)
-
-
-class HostQueueEntry(DBObject):
-    _table_name = 'afe_host_queue_entries'
-    _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
-               'active', 'complete', 'deleted', 'execution_subdir',
-               'atomic_group_id', 'aborted', 'started_on', 'finished_on')
-
-    _COMPLETION_COUNT_METRIC = metrics.Counter(
-        'chromeos/autotest/scheduler/hqe_completion_count')
-
-    def __init__(self, id=None, row=None, job_row=None, **kwargs):
-        """
-        @param id: ID field from afe_host_queue_entries table.
-                   Either id or row should be specified for initialization.
-        @param row: The DB row for a particular HostQueueEntry.
-                    Either id or row should be specified for initialization.
-        @param job_row: The DB row for the job of this HostQueueEntry.
-        """
-        assert id or row
-        super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
-        self.job = Job(self.job_id, row=job_row)
-
-        if self.host_id:
-            self.host = rdb_lib.get_hosts([self.host_id])[0]
-            self.host.dbg_str = self.get_dbg_str()
-            self.host.metadata = get_job_metadata(self.job)
-        else:
-            self.host = None
-
-
-    @classmethod
-    def clone(cls, template):
-        """
-        Creates a new row using the values from a template instance.
-
-        The new instance will not exist in the database or have a valid
-        id attribute until its save() method is called.
-        """
-        assert isinstance(template, cls)
-        new_row = [getattr(template, field) for field in cls._fields]
-        clone = cls(row=new_row, new_record=True)
-        clone.id = None
-        return clone
-
-
-    @classmethod
-    def fetch(cls, where='', params=(), joins='', order_by=''):
-        """
-        Construct instances of our class based on the given database query.
-
-        @yields One class instance for each row fetched.
-        """
-        # Override the original fetch method to pre-fetch the jobs from the DB
-        # in order to prevent each HQE making separate DB queries.
-        rows = cls.fetch_rows(where=where, params=params, joins=joins,
-                              order_by=order_by)
-        if len(rows) <= 1:
-            return [cls(id=row[0], row=row) for row in rows]
-
-        job_params = ', '.join([str(row[1]) for row in rows])
-        job_rows = Job.fetch_rows(where='id IN (%s)' % (job_params))
-        # Create a Job_id to Job_row match dictionary to match the HQE
-        # to its corresponding job.
-        job_dict = {job_row[0]: job_row for job_row in job_rows}
-        return [cls(id=row[0], row=row, job_row=job_dict.get(row[1]))
-                for row in rows]
-
-
-    def _view_job_url(self):
-        return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
-
-
-    def get_labels(self):
-        """
-        Get all labels associated with this host queue entry (either via the
-        meta_host or as a job dependency label).  The labels yielded are not
-        guaranteed to be unique.
-
-        @yields Label instances associated with this host_queue_entry.
-        """
-        if self.meta_host:
-            yield Label(id=self.meta_host, always_query=False)
-        labels = Label.fetch(
-                joins="JOIN afe_jobs_dependency_labels AS deps "
-                      "ON (afe_labels.id = deps.label_id)",
-                where="deps.job_id = %d" % self.job.id)
-        for label in labels:
-            yield label
-
-
-    def set_host(self, host):
-        if host:
-            logging.info('Assigning host %s to entry %s', host.hostname, self)
-            self.update_field('host_id', host.id)
-            self.block_host(host.id)
-        else:
-            logging.info('Releasing host from %s', self)
-            self.unblock_host(self.host.id)
-            self.update_field('host_id', None)
-
-        self.host = host
-
-
-    def block_host(self, host_id):
-        logging.info("creating block %s/%s", self.job.id, host_id)
-        row = [0, self.job.id, host_id]
-        block = IneligibleHostQueue(row=row, new_record=True)
-        block.save()
-
-
-    def unblock_host(self, host_id):
-        logging.info("removing block %s/%s", self.job.id, host_id)
-        blocks = IneligibleHostQueue.fetch(
-            'job_id=%d and host_id=%d' % (self.job.id, host_id))
-        for block in blocks:
-            block.delete()
-
-
-    def set_execution_subdir(self, subdir=None):
-        if subdir is None:
-            assert self.host
-            subdir = self.host.hostname
-        self.update_field('execution_subdir', subdir)
-
-
-    def _get_hostname(self):
-        if self.host:
-            return self.host.hostname
-        return 'no host'
-
-
-    def get_dbg_str(self):
-        """Get a debug string to identify this host.
-
-        @return: A string containing the hqe and job id.
-        """
-        try:
-            return 'HQE: %s, for job: %s' % (self.id, self.job_id)
-        except AttributeError as e:
-            return 'HQE has not been initialized yet: %s' % e
-
-
-    def __str__(self):
-        flags = []
-        if self.active:
-            flags.append('active')
-        if self.complete:
-            flags.append('complete')
-        if self.deleted:
-            flags.append('deleted')
-        if self.aborted:
-            flags.append('aborted')
-        flags_str = ','.join(flags)
-        if flags_str:
-            flags_str = ' [%s]' % flags_str
-        return ("%s and host: %s has status:%s%s" %
-                (self.get_dbg_str(), self._get_hostname(), self.status,
-                 flags_str))
-
-
-    def set_status(self, status):
-        logging.info("%s -> %s", self, status)
-
-        self.update_field('status', status)
-
-        active = (status in models.HostQueueEntry.ACTIVE_STATUSES)
-        complete = (status in models.HostQueueEntry.COMPLETE_STATUSES)
-
-        self.update_field('active', active)
-
-        # The ordering of these operations is important. Once we set the
-        # complete bit this job will become indistinguishable from all
-        # the other complete jobs, unless we first set shard_id to NULL
-        # to signal to the shard_client that we need to upload it. However,
-        # we can only set both these after we've updated finished_on etc
-        # within _on_complete or the job will get synced in an intermediate
-        # state. This means that if someone sigkills the scheduler between
-        # setting finished_on and complete, we will have inconsistent jobs.
-        # This should be fine, because nothing critical checks finished_on,
-        # and the scheduler should never be killed mid-tick.
-        if complete:
-            self._on_complete(status)
-            self._email_on_job_complete()
-
-        self.update_field('complete', complete)
-
-        should_email_status = (status.lower() in _notify_email_statuses or
-                               'all' in _notify_email_statuses)
-        if should_email_status:
-            self._email_on_status(status)
-        logging.debug('HQE Set Status Complete')
-
-
-    def _on_complete(self, status):
-        metric_fields = {'status': status.lower()}
-        if self.host:
-            metric_fields['board'] = self.host.board or ''
-            if len(self.host.pools) == 1:
-                metric_fields['pool'] = self.host.pools[0]
-            else:
-                metric_fields['pool'] = 'MULTIPLE'
-        else:
-            metric_fields['board'] = 'NO_HOST'
-            metric_fields['pool'] = 'NO_HOST'
-        self._COMPLETION_COUNT_METRIC.increment(fields=metric_fields)
-        if status is not models.HostQueueEntry.Status.ABORTED:
-            self.job.stop_if_necessary()
-        if self.started_on:
-            self.set_finished_on_now()
-            self._log_trace()
-        if self.job.shard_id is not None:
-            # If shard_id is None, the job will be synced back to the master
-            self.job.update_field('shard_id', None)
-        if not self.execution_subdir:
-            return
-        # unregister any possible pidfiles associated with this queue entry
-        for pidfile_name in drone_manager.ALL_PIDFILE_NAMES:
-            pidfile_id = _drone_manager.get_pidfile_id_from(
-                    self.execution_path(), pidfile_name=pidfile_name)
-            _drone_manager.unregister_pidfile(pidfile_id)
-
-    def _log_trace(self):
-        """Emits a Cloud Trace span for the HQE's duration."""
-        if self.started_on and self.finished_on:
-            span = cloud_trace.Span('HQE', spanId='0',
-                                    traceId=hqe_trace_id(self.id))
-            # TODO(phobbs) make a .SetStart() and .SetEnd() helper method
-            span.startTime = types.Timestamp()
-            span.startTime.FromDatetime(self.started_on)
-            span.endTime = types.Timestamp()
-            span.endTime.FromDatetime(self.finished_on)
-            # TODO(phobbs) any LogSpan calls need to be wrapped in this for
-            # safety during tests, so this should be caught within LogSpan.
-            try:
-                cloud_trace.LogSpan(span)
-            except IOError as e:
-                if e.errno == errno.ENOENT:
-                    logging.warning('Error writing to cloud trace results '
-                                    'directory: %s', e)
-
-
-    def _get_status_email_contents(self, status, summary=None, hostname=None):
-        """
-        Gather info for the status notification e-mails.
-
-        If needed, we could start using the Django templating engine to create
-        the subject and the e-mail body, but that doesn't seem necessary right
-        now.
-
-        @param status: Job status text. Mandatory.
-        @param summary: Job summary text. Optional.
-        @param hostname: A hostname for the job. Optional.
-
-        @return: Tuple (subject, body) for the notification e-mail.
-        """
-        job_stats = Job(id=self.job.id).get_execution_details()
-
-        subject = ('Autotest | Job ID: %s "%s" | Status: %s ' %
-                   (self.job.id, self.job.name, status))
-
-        if hostname is not None:
-            subject += '| Hostname: %s ' % hostname
-
-        if status not in ["1 Failed", "Failed"]:
-            subject += '| Success Rate: %.2f %%' % job_stats['success_rate']
-
-        body =  "Job ID: %s\n" % self.job.id
-        body += "Job name: %s\n" % self.job.name
-        if hostname is not None:
-            body += "Host: %s\n" % hostname
-        if summary is not None:
-            body += "Summary: %s\n" % summary
-        body += "Status: %s\n" % status
-        body += "Results interface URL: %s\n" % self._view_job_url()
-        body += "Execution time (HH:MM:SS): %s\n" % job_stats['execution_time']
-        if int(job_stats['total_executed']) > 0:
-            body += "User tests executed: %s\n" % job_stats['total_executed']
-            body += "User tests passed: %s\n" % job_stats['total_passed']
-            body += "User tests failed: %s\n" % job_stats['total_failed']
-            body += ("User tests success rate: %.2f %%\n" %
-                     job_stats['success_rate'])
-
-        if job_stats['failed_rows']:
-            body += "Failures:\n"
-            body += job_stats['failed_rows']
-
-        return subject, body
-
-
-    def _email_on_status(self, status):
-        # TODO(crbug.com/1033823): Deleted
-        pass
-
-
-    def _email_on_job_complete(self):
-        # TODO(crbug.com/1033823): Deleted
-        pass
-
-
-    def schedule_pre_job_tasks(self):
-        logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",
-                     self.job.name, self.meta_host, self.atomic_group_id,
-                     self.job.id, self.id, self.host.hostname, self.status)
-
-        self._do_schedule_pre_job_tasks()
-
-
-    def _do_schedule_pre_job_tasks(self):
-        self.job.schedule_pre_job_tasks(queue_entry=self)
-
-
-    def requeue(self):
-        assert self.host
-        self.set_status(models.HostQueueEntry.Status.QUEUED)
-        self.update_field('started_on', None)
-        self.update_field('finished_on', None)
-        # verify/cleanup failure sets the execution subdir, so reset it here
-        self.set_execution_subdir('')
-        if self.meta_host:
-            self.set_host(None)
-
-
-    @property
-    def aborted_by(self):
-        self._load_abort_info()
-        return self._aborted_by
-
-
-    @property
-    def aborted_on(self):
-        self._load_abort_info()
-        return self._aborted_on
-
-
-    def _load_abort_info(self):
-        """ Fetch info about who aborted the job. """
-        if hasattr(self, "_aborted_by"):
-            return
-        rows = _db.execute("""
-                SELECT afe_users.login,
-                        afe_aborted_host_queue_entries.aborted_on
-                FROM afe_aborted_host_queue_entries
-                INNER JOIN afe_users
-                ON afe_users.id = afe_aborted_host_queue_entries.aborted_by_id
-                WHERE afe_aborted_host_queue_entries.queue_entry_id = %s
-                """, (self.id,))
-        if rows:
-            self._aborted_by, self._aborted_on = rows[0]
-        else:
-            self._aborted_by = self._aborted_on = None
-
-
-    def on_pending(self):
-        """
-        Called when an entry in a synchronous job has passed verify.  If the
-        job is ready to run, sets the entries to STARTING. Otherwise, it leaves
-        them in PENDING.
-        """
-        self.set_status(models.HostQueueEntry.Status.PENDING)
-        if not self.host:
-            raise scheduler_lib.NoHostIdError(
-                    'Failed to recover a job whose host_queue_entry_id=%r due'
-                    ' to no host_id.'
-                    % self.id)
-        self.host.set_status(models.Host.Status.PENDING)
-
-        # Some debug code here: sends an email if an asynchronous job does not
-        # immediately enter Starting.
-        # TODO: Remove this once we figure out why asynchronous jobs are getting
-        # stuck in Pending.
-        self.job.run_if_ready(queue_entry=self)
-
-
-    def abort(self, dispatcher):
-        assert self.aborted and not self.complete
-
-        Status = models.HostQueueEntry.Status
-        if self.status in {Status.GATHERING, Status.PARSING}:
-            # do nothing; post-job tasks will finish and then mark this entry
-            # with status "Aborted" and take care of the host
-            return
-
-        if self.status in {Status.STARTING, Status.PENDING, Status.RUNNING}:
-            # If hqe is in any of these status, it should not have any
-            # unfinished agent before it can be aborted.
-            agents = dispatcher.get_agents_for_entry(self)
-            # Agent with finished task can be left behind. This is added to
-            # handle the special case of aborting hostless job in STARTING
-            # status, in which the agent has only a HostlessQueueTask
-            # associated. The finished HostlessQueueTask will be cleaned up in
-            # the next tick, so it's safe to leave the agent there. Without
-            # filtering out finished agent, HQE abort won't be able to proceed.
-            assert all([agent.is_done() for agent in agents])
-
-        if self.host:
-            if self.status in {Status.STARTING, Status.PENDING, Status.RUNNING}:
-                self.host.set_status(models.Host.Status.READY)
-            elif self.status in {Status.VERIFYING, Status.RESETTING}:
-                models.SpecialTask.objects.create(
-                        task=models.SpecialTask.Task.CLEANUP,
-                        host=models.Host.objects.get(id=self.host.id),
-                        requested_by=self.job.owner_model())
-            elif self.status == Status.PROVISIONING:
-                models.SpecialTask.objects.create(
-                        task=models.SpecialTask.Task.REPAIR,
-                        host=models.Host.objects.get(id=self.host.id),
-                        requested_by=self.job.owner_model())
-
-        self.set_status(Status.ABORTED)
-
-
-    def execution_tag(self):
-        SQL_SUSPECT_ENTRIES = ('SELECT * FROM afe_host_queue_entries WHERE '
-                               'complete!=1 AND execution_subdir="" AND '
-                               'status!="Queued";')
-        SQL_FIX_SUSPECT_ENTRY = ('UPDATE afe_host_queue_entries SET '
-                                 'status="Aborted" WHERE id=%s;')
-        try:
-            assert self.execution_subdir
-        except AssertionError:
-            # TODO(scottz): Remove temporary fix/info gathering pathway for
-            # crosbug.com/31595 once issue is root caused.
-            logging.error('No execution_subdir for host queue id:%s.', self.id)
-            logging.error('====DB DEBUG====\n%s', SQL_SUSPECT_ENTRIES)
-            for row in _db.execute(SQL_SUSPECT_ENTRIES):
-                logging.error(row)
-            logging.error('====DB DEBUG====\n')
-            fix_query = SQL_FIX_SUSPECT_ENTRY % self.id
-            logging.error('EXECUTING: %s', fix_query)
-            _db.execute(SQL_FIX_SUSPECT_ENTRY % self.id)
-            raise AssertionError(('self.execution_subdir not found. '
-                                  'See log for details.'))
-
-        return "%s/%s" % (self.job.tag(), self.execution_subdir)
-
-
-    def execution_path(self):
-        return self.execution_tag()
-
-
-    def set_started_on_now(self):
-        self.update_field('started_on', datetime.datetime.now())
-
-
-    def set_finished_on_now(self):
-        self.update_field('finished_on', datetime.datetime.now())
-
-
-    def is_hostless(self):
-        return (self.host_id is None
-                and self.meta_host is None)
-
-
-def hqe_trace_id(hqe_id):
-    """Constructs the canonical trace id based on the HQE's id.
-
-    Encodes 'HQE' in base16 and concatenates with the hex representation
-    of the HQE's id.
-
-    @param hqe_id: The HostQueueEntry's id.
-
-    Returns:
-        A trace id (in hex format)
-    """
-    return base64.b16encode('HQE') + hex(hqe_id)[2:]
-
-
-class Job(DBObject):
-    _table_name = 'afe_jobs'
-    _fields = ('id', 'owner', 'name', 'priority', 'control_file',
-               'control_type', 'created_on', 'synch_count', 'timeout',
-               'run_verify', 'email_list', 'reboot_before', 'reboot_after',
-               'parse_failed_repair', 'max_runtime_hrs', 'drone_set_id',
-               'parameterized_job_id', 'max_runtime_mins', 'parent_job_id',
-               'test_retry', 'run_reset', 'timeout_mins', 'shard_id',
-               'require_ssp')
-
-    # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
-    # all status='Pending' atomic group HQEs incase a delay was running when the
-    # scheduler was restarted and no more hosts ever successfully exit Verify.
-
-    def __init__(self, id=None, row=None, **kwargs):
-        assert id or row
-        super(Job, self).__init__(id=id, row=row, **kwargs)
-        self._owner_model = None # caches model instance of owner
-        self.update_image_path = None # path of OS image to install
-
-
-    def model(self):
-        return models.Job.objects.get(id=self.id)
-
-
-    def owner_model(self):
-        # work around the fact that the Job owner field is a string, not a
-        # foreign key
-        if not self._owner_model:
-            self._owner_model = models.User.objects.get(login=self.owner)
-        return self._owner_model
-
-
-    def tag(self):
-        return "%s-%s" % (self.id, self.owner)
-
-
-    def get_execution_details(self):
-        """
-        Get test execution details for this job.
-
-        @return: Dictionary with test execution details
-        """
-        def _find_test_jobs(rows):
-            """
-            Here we are looking for tests such as SERVER_JOB and CLIENT_JOB.*
-            Those are autotest 'internal job' tests, so they should not be
-            counted when evaluating the test stats.
-
-            @param rows: List of rows (matrix) with database results.
-            """
-            job_test_pattern = re.compile('SERVER|CLIENT\\_JOB\.[\d]')
-            n_test_jobs = 0
-            for r in rows:
-                test_name = r[0]
-                if job_test_pattern.match(test_name):
-                    n_test_jobs += 1
-
-            return n_test_jobs
-
-        stats = {}
-
-        rows = _db.execute("""
-                SELECT t.test, s.word, t.reason
-                FROM tko_tests AS t, tko_jobs AS j, tko_status AS s
-                WHERE t.job_idx = j.job_idx
-                AND s.status_idx = t.status
-                AND j.afe_job_id = %s
-                ORDER BY t.reason
-                """ % self.id)
-
-        failed_rows = [r for r in rows if not r[1] == 'GOOD']
-
-        n_test_jobs = _find_test_jobs(rows)
-        n_test_jobs_failed = _find_test_jobs(failed_rows)
-
-        total_executed = len(rows) - n_test_jobs
-        total_failed = len(failed_rows) - n_test_jobs_failed
-
-        if total_executed > 0:
-            success_rate = 100 - ((total_failed / float(total_executed)) * 100)
-        else:
-            success_rate = 0
-
-        stats['total_executed'] = total_executed
-        stats['total_failed'] = total_failed
-        stats['total_passed'] = total_executed - total_failed
-        stats['success_rate'] = success_rate
-
-        status_header = ("Test Name", "Status", "Reason")
-        if failed_rows:
-            stats['failed_rows'] = utils.matrix_to_string(failed_rows,
-                                                          status_header)
-        else:
-            stats['failed_rows'] = ''
-
-        time_row = _db.execute("""
-                   SELECT started_time, finished_time
-                   FROM tko_jobs
-                   WHERE afe_job_id = %s
-                   """ % self.id)
-
-        if time_row:
-            t_begin, t_end = time_row[0]
-            try:
-                delta = t_end - t_begin
-                minutes, seconds = divmod(delta.seconds, 60)
-                hours, minutes = divmod(minutes, 60)
-                stats['execution_time'] = ("%02d:%02d:%02d" %
-                                           (hours, minutes, seconds))
-            # One of t_end or t_begin are None
-            except TypeError:
-                stats['execution_time'] = '(could not determine)'
-        else:
-            stats['execution_time'] = '(none)'
-
-        return stats
-
-
-    def keyval_dict(self):
-        return self.model().keyval_dict()
-
-
-    def _pending_count(self):
-        """The number of HostQueueEntries for this job in the Pending state."""
-        pending_entries = models.HostQueueEntry.objects.filter(
-                job=self.id, status=models.HostQueueEntry.Status.PENDING)
-        return pending_entries.count()
-
-
-    def is_ready(self):
-        pending_count = self._pending_count()
-        ready = (pending_count >= self.synch_count)
-
-        if not ready:
-            logging.info(
-                    'Job %s not ready: %s pending, %s required ',
-                    self, pending_count, self.synch_count)
-
-        return ready
-
-
-    def num_machines(self, clause = None):
-        sql = "job_id=%s" % self.id
-        if clause:
-            sql += " AND (%s)" % clause
-        return self.count(sql, table='afe_host_queue_entries')
-
-
-    def num_queued(self):
-        return self.num_machines('not complete')
-
-
-    def num_active(self):
-        return self.num_machines('active')
-
-
-    def num_complete(self):
-        return self.num_machines('complete')
-
-
-    def is_finished(self):
-        return self.num_complete() == self.num_machines()
-
-
-    def _not_yet_run_entries(self, include_active=True):
-        if include_active:
-          statuses = list(models.HostQueueEntry.PRE_JOB_STATUSES)
-        else:
-          statuses = list(models.HostQueueEntry.IDLE_PRE_JOB_STATUSES)
-        return models.HostQueueEntry.objects.filter(job=self.id,
-                                                    status__in=statuses)
-
-
-    def _stop_all_entries(self):
-        """Stops the job's inactive pre-job HQEs."""
-        entries_to_stop = self._not_yet_run_entries(
-            include_active=False)
-        for child_entry in entries_to_stop:
-            assert not child_entry.complete, (
-                '%s status=%s, active=%s, complete=%s' %
-                (child_entry.id, child_entry.status, child_entry.active,
-                 child_entry.complete))
-            if child_entry.status == models.HostQueueEntry.Status.PENDING:
-                child_entry.host.status = models.Host.Status.READY
-                child_entry.host.save()
-            child_entry.status = models.HostQueueEntry.Status.STOPPED
-            child_entry.save()
-
-
-    def stop_if_necessary(self):
-        not_yet_run = self._not_yet_run_entries()
-        if not_yet_run.count() < self.synch_count:
-            self._stop_all_entries()
-
-
-    def _next_group_name(self):
-        """@returns a directory name to use for the next host group results."""
-        group_count_re = re.compile(r'group(\d+)')
-        query = models.HostQueueEntry.objects.filter(
-            job=self.id).values('execution_subdir').distinct()
-        subdirs = (entry['execution_subdir'] for entry in query)
-        group_matches = (group_count_re.match(subdir) for subdir in subdirs)
-        ids = [int(match.group(1)) for match in group_matches if match]
-        if ids:
-            next_id = max(ids) + 1
-        else:
-            next_id = 0
-        return 'group%d' % (next_id,)
-
-
-    def get_group_entries(self, queue_entry_from_group):
-        """
-        @param queue_entry_from_group: A HostQueueEntry instance to find other
-                group entries on this job for.
-
-        @returns A list of HostQueueEntry objects all executing this job as
-                part of the same group as the one supplied (having the same
-                execution_subdir).
-        """
-        execution_subdir = queue_entry_from_group.execution_subdir
-        return list(HostQueueEntry.fetch(
-            where='job_id=%s AND execution_subdir=%s',
-            params=(self.id, execution_subdir)))
-
-
-    def _should_run_cleanup(self, queue_entry):
-        if self.reboot_before == model_attributes.RebootBefore.ALWAYS:
-            return True
-        elif self.reboot_before == model_attributes.RebootBefore.IF_DIRTY:
-            return queue_entry.host.dirty
-        return False
-
-
-    def _should_run_verify(self, queue_entry):
-        do_not_verify = (queue_entry.host.protection ==
-                         host_protections.Protection.DO_NOT_VERIFY)
-        if do_not_verify:
-            return False
-        # If RebootBefore is set to NEVER, then we won't run reset because
-        # we can't cleanup, so we need to weaken a Reset into a Verify.
-        weaker_reset = (self.run_reset and
-                self.reboot_before == model_attributes.RebootBefore.NEVER)
-        return self.run_verify or weaker_reset
-
-
-    def _should_run_reset(self, queue_entry):
-        can_verify = (queue_entry.host.protection !=
-                         host_protections.Protection.DO_NOT_VERIFY)
-        can_reboot = self.reboot_before != model_attributes.RebootBefore.NEVER
-        return (can_reboot and can_verify
-                and (self.run_reset
-                     or (self._should_run_cleanup(queue_entry)
-                         and self._should_run_verify(queue_entry))))
-
-
-    def _should_run_provision(self, queue_entry):
-        """
-        Determine if the queue_entry needs to have a provision task run before
-        it to provision queue_entry.host.
-
-        @param queue_entry: The host queue entry in question.
-        @returns: True if we should schedule a provision task, False otherwise.
-
-        """
-        # If we get to this point, it means that the scheduler has already
-        # vetted that all the unprovisionable labels match, so we can just
-        # find all labels on the job that aren't on the host to get the list
-        # of what we need to provision.  (See the scheduling logic in
-        # host_scheduler.py:is_host_eligable_for_job() where we discard all
-        # actionable labels when assigning jobs to hosts.)
-        job_labels = {x.name for x in queue_entry.get_labels()}
-        # Skip provision if `skip_provision` is listed in the job labels.
-        if provision.SKIP_PROVISION in job_labels:
-            return False
-        _, host_labels = queue_entry.host.platform_and_labels()
-        # If there are any labels on the job that are not on the host and they
-        # are labels that provisioning knows how to change, then that means
-        # there is provisioning work to do.  If there's no provisioning work to
-        # do, then obviously we have no reason to schedule a provision task!
-        diff = job_labels - set(host_labels)
-        if any([provision.Provision.acts_on(x) for x in diff]):
-            return True
-        return False
-
-
-    def _queue_special_task(self, queue_entry, task):
-        """
-        Create a special task and associate it with a host queue entry.
-
-        @param queue_entry: The queue entry this special task should be
-                            associated with.
-        @param task: One of the members of the enum models.SpecialTask.Task.
-        @returns: None
-
-        """
-        models.SpecialTask.objects.create(
-                host=models.Host.objects.get(id=queue_entry.host_id),
-                queue_entry=queue_entry, task=task)
-
-
-    def schedule_pre_job_tasks(self, queue_entry):
-        """
-        Queue all of the special tasks that need to be run before a host
-        queue entry may run.
-
-        If no special taskes need to be scheduled, then |on_pending| will be
-        called directly.
-
-        @returns None
-
-        """
-        task_queued = False
-        hqe_model = models.HostQueueEntry.objects.get(id=queue_entry.id)
-
-        if self._should_run_provision(queue_entry):
-            self._queue_special_task(hqe_model,
-                                     models.SpecialTask.Task.PROVISION)
-            task_queued = True
-        elif self._should_run_reset(queue_entry):
-            self._queue_special_task(hqe_model, models.SpecialTask.Task.RESET)
-            task_queued = True
-        else:
-            if self._should_run_cleanup(queue_entry):
-                self._queue_special_task(hqe_model,
-                                         models.SpecialTask.Task.CLEANUP)
-                task_queued = True
-            if self._should_run_verify(queue_entry):
-                self._queue_special_task(hqe_model,
-                                         models.SpecialTask.Task.VERIFY)
-                task_queued = True
-
-        if not task_queued:
-            queue_entry.on_pending()
-
-
-    def _assign_new_group(self, queue_entries):
-        if len(queue_entries) == 1:
-            group_subdir_name = queue_entries[0].host.hostname
-        else:
-            group_subdir_name = self._next_group_name()
-            logging.info('Running synchronous job %d hosts %s as %s',
-                self.id, [entry.host.hostname for entry in queue_entries],
-                group_subdir_name)
-
-        for queue_entry in queue_entries:
-            queue_entry.set_execution_subdir(group_subdir_name)
-
-
-    def _choose_group_to_run(self, include_queue_entry):
-        """
-        @returns A tuple containing a list of HostQueueEntry instances to be
-                used to run this Job, a string group name to suggest giving
-                to this job in the results database.
-        """
-        chosen_entries = [include_queue_entry]
-        num_entries_wanted = self.synch_count
-        num_entries_wanted -= len(chosen_entries)
-
-        if num_entries_wanted > 0:
-            where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
-            pending_entries = list(HostQueueEntry.fetch(
-                     where=where_clause,
-                     params=(self.id, include_queue_entry.id)))
-
-            # Sort the chosen hosts by hostname before slicing.
-            def cmp_queue_entries_by_hostname(entry_a, entry_b):
-                return Host.cmp_for_sort(entry_a.host, entry_b.host)
-            pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
-            chosen_entries += pending_entries[:num_entries_wanted]
-
-        # Sanity check.  We'll only ever be called if this can be met.
-        if len(chosen_entries) < self.synch_count:
-            message = ('job %s got less than %s chosen entries: %s' % (
-                    self.id, self.synch_count, chosen_entries))
-            logging.error(message)
-            return []
-
-        self._assign_new_group(chosen_entries)
-        return chosen_entries
-
-
-    def run_if_ready(self, queue_entry):
-        """
-        Run this job by kicking its HQEs into status='Starting' if enough
-        hosts are ready for it to run.
-
-        Cleans up by kicking HQEs into status='Stopped' if this Job is not
-        ready to run.
-        """
-        if not self.is_ready():
-            self.stop_if_necessary()
-        else:
-            self.run(queue_entry)
-
-
-    def request_abort(self):
-        """Request that this Job be aborted on the next scheduler cycle."""
-        self.model().abort()
-
-
-    def run(self, queue_entry):
-        """
-        @param queue_entry: The HostQueueEntry instance calling this method.
-        """
-        queue_entries = self._choose_group_to_run(queue_entry)
-        if queue_entries:
-            self._finish_run(queue_entries)
-
-
-    def _finish_run(self, queue_entries):
-        for queue_entry in queue_entries:
-            queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
-
-
-    def __str__(self):
-        return '%s-%s' % (self.id, self.owner)