[autotest] Lease hosts according to frontend job priorities.

This cl modifies the way we lease hosts by teaching the
RDBServerHostWrapper to handle host leasing. Though this involves
a seperate query for each host it leads to a design we can later
build atomicity into, because we can check the leased bit on a single
host before setting it. This model of leasing also has the following benefits:
    1. It doesn't abuse the response map.
    2. It gives us more clarity into which reqeusts are acquiring
        hosts by setting the leased bit in step with host validation.
    3. It is more tolerant to db errors because exceptions raised while
        leasing one host will not fail the entire batched request.

This cl also adds an rdb_unittest module.

TEST=Unittests, ran suites.
BUG=chromium:353183
DEPLOY=scheduler

Change-Id: I35c04bcb37eee0191a211c133a35824cc78b5d71
Reviewed-on: https://chromium-review.googlesource.com/193182
Reviewed-by: Prashanth B <beeps@chromium.org>
Commit-Queue: Prashanth B <beeps@chromium.org>
Tested-by: Prashanth B <beeps@chromium.org>
diff --git a/scheduler/rdb.py b/scheduler/rdb.py
index 0f6d60b..24d2279 100644
--- a/scheduler/rdb.py
+++ b/scheduler/rdb.py
@@ -61,14 +61,14 @@
         @param acls: A list of acls, at least one of which must coincide with
             an acl group the chosen host is in.
 
-        @return: A list of matching hosts available.
+        @return: A set of matching hosts available.
         """
         hosts_available = self.host_objects.filter(invalid=0)
         queries = [Q(labels__id=dep) for dep in deps]
         queries += [Q(aclgroup__id__in=acls)]
         for query in queries:
             hosts_available = hosts_available.filter(query)
-        return hosts_available
+        return set(hosts_available)
 
 
 class AvailableHostQueryManager(BaseHostQueryManager):
@@ -165,7 +165,7 @@
             try:
                 response = self.host_query_manager.update_hosts(hosts, **payload)
             except (django_exceptions.FieldError,
-                    fields.FieldDoesNotExist) as e:
+                    fields.FieldDoesNotExist, ValueError) as e:
                 for host in hosts:
                     # Since update requests have a consistent hash this will map
                     # to the same key as the original request.
@@ -211,14 +211,52 @@
 
 
     def lease_hosts(self, hosts):
-        """Leases a list hosts.
+        """Leases a list of hosts.
 
-        @param hosts: A list of hosts to lease.
+        @param hosts: A list of RDBServerHostWrapper instances to lease.
+
+        @return: The list of RDBServerHostWrappers that were successfully
+            leased.
         """
-        #TODO(beeps): crbug.com/353183, we're abusing the response_map here.
-        requests = [rdb_requests.UpdateHostRequest(host_id=host.id,
-                payload={'leased': 1}).get_request() for host in hosts]
-        super(AvailableHostRequestHandler, self).update_hosts(requests)
+        #TODO(beeps): crbug.com/353183.
+        unleased_hosts = set(hosts)
+        leased_hosts = set([])
+        for host in unleased_hosts:
+            try:
+                host.lease()
+            except rdb_utils.RDBException as e:
+                logging.error('Unable to lease host %s: %s', host.hostname, e)
+            else:
+                leased_hosts.add(host)
+        return list(leased_hosts)
+
+
+    @classmethod
+    def valid_host_assignment(cls, request, host):
+        """Check if a host, request pairing is valid.
+
+        @param request: The request to match against the host.
+        @param host: An RDBServerHostWrapper instance.
+
+        @return: True if the host, request assignment is valid.
+
+        @raises RDBException: If the request already has another host_ids
+            associated with it.
+        """
+        if request.host_id and request.host_id != host.id:
+            raise rdb_utils.RDBException(
+                    'Cannot assign a different host for request: %s, it '
+                    'already has one: %s ' % (request, host.id))
+
+        # Getting all labels and acls might result in large queries, so
+        # bail early if the host is already leased.
+        if host.leased:
+            return False
+        # If a host is invalid it must be a one time host added to the
+        # afe specifically for this purpose, so it doesn't require acl checking.
+        acl_match = (request.acls.intersection(host.acls) or host.invalid)
+        label_match = (request.deps.intersection(host.labels) == request.deps)
+        return acl_match and label_match
 
 
     @_timer.decorate
@@ -243,9 +281,11 @@
                             request.deps, request.acls)
             num_hosts = min(len(hosts), count)
             if num_hosts:
-                # TODO(beeps): Only reserve hosts we have successfully leased.
-                self.lease_hosts(hosts[:num_hosts])
-                self.update_response_map(request, hosts[:num_hosts])
+                # Try leasing num hosts, but only update the response map with
+                # those that we could successfully lease.
+                leased_hosts = self.lease_hosts(hosts[:num_hosts])
+                self.update_response_map(request, leased_hosts)
+                num_hosts = len(leased_hosts)
             if num_hosts < count:
                 logging.warning('%s Unsatisfied rdb acquisition request:%s ',
                                 count-num_hosts, request)
@@ -259,29 +299,39 @@
         request-host pairings. Lease the remaining hsots.
 
         @param requests: A list of requests to validate.
+
+        @raises RDBException: If multiple hosts or the wrong host is returned
+            for a response.
         """
-        # Multiple requests can have the same host (but different acls/deps),
-        # and multiple jobs can submit identical requests (same host_id,
-        # acls, deps). In both these cases the first request to check the host
-        # map wins, though in the second case it doesn't matter.
+        # The following cases are possible for frontend requests:
+        # 1. Multiple requests for 1 host, with different acls/deps/priority:
+        #    These form distinct requests because they hash differently.
+        #    The response map will contain entries like: {r1: h1, r2: h1}
+        #    after the batch_get_hosts call. There are 2 sub-cases:
+        #        a. Same deps/acls, different priority:
+        #           Since we sort the requests based on priority, the
+        #           higher priority request r1, will lease h1. The
+        #           validation of r2, h1 will fail because of the r1 lease.
+        #        b. Different deps/acls, only one of which matches the host:
+        #           The matching request will lease h1. The other host
+        #           pairing will get dropped from the response map.
+        # 2. Multiple requests with the same acls/deps/priority and 1 host:
+        #    These all have the same request hash, so the response map will
+        #    contain: {r: h}, regardless of the number of r's. If this is not
+        #    a valid host assignment it will get dropped from the response.
         self.batch_get_hosts(set(requests))
-        for request in self.response_map.keys():
+        for request in sorted(self.response_map.keys(),
+                key=lambda request: request.priority, reverse=True):
             hosts = self.response_map[request]
             if len(hosts) > 1:
                 raise rdb_utils.RDBException('Got multiple hosts for a single '
                         'request. Hosts: %s, request %s.' % (hosts, request))
-            host = hosts[0]
-            if not ((request.acls.intersection(host.acls) or host.invalid) and
-                    request.deps.intersection(host.labels) == request.deps):
-                if request.host_id != host.id:
-                    raise rdb_utils.RDBException('Cannot assign a different '
-                            'host for requset: %s, it already has one: %s ' %
-                            (request, host.id))
-                del self.response_map[request]
-                logging.warning('Failed rdb validation request:%s ', request)
-
-        # TODO(beeps): Update acquired hosts with failed leases.
-        self.lease_hosts([hosts[0] for hosts in self.response_map.values()])
+            if (self.valid_host_assignment(request, hosts[0]) and
+                    self.lease_hosts(hosts)):
+                continue
+            del self.response_map[request]
+            logging.warning('Request %s was not able to lease host %s',
+                            request, hosts[0])
 
 
 # Request dispatchers: Create the appropriate request handler, send a list
diff --git a/scheduler/rdb_hosts.py b/scheduler/rdb_hosts.py
index 203eda0..9f1f8b4 100644
--- a/scheduler/rdb_hosts.py
+++ b/scheduler/rdb_hosts.py
@@ -85,17 +85,36 @@
     """
 
     def __init__(self, host):
+        """Create an RDBServerHostWrapper.
+
+        @param host: An instance of the Host model class.
+        """
         host_fields = RDBHost.get_required_fields_from_host(host)
         super(RDBServerHostWrapper, self).__init__(**host_fields)
-        self.labels = rdb_utils.LabelIterator(
-                (label for label in host.labels.all()))
-        self.acls = rdb_utils.RememberingIterator(
-                (acl.id for acl in host.aclgroup_set.all()))
+        self.labels = rdb_utils.LabelIterator(host.labels.all())
+        self.acls = [aclgroup.id for aclgroup in host.aclgroup_set.all()]
         self.protection = host.protection
         platform = host.platform()
         # Platform needs to be a method, not an attribute, for
         # backwards compatibility with the rest of the host model.
         self.platform_name = platform.name if platform else None
+        self._host = host
+
+
+    def lease(self):
+        """Set the leased bit on the host object, and in the database.
+
+        @raises RDBException: If the host is already leased, or the
+            RDBServerHostWrapper leased bit is set.
+        """
+        if self._host.leased or self.leased:
+            raise rdb_utils.RDBException(
+                    'Error leasing host %s, RDBServerHostWrapper leased: %s, '
+                    'Host db object leased: %s' %
+                    (self.hostname, self.leased, self._host.leased))
+        self._host.leased = 1
+        self._host.save()
+        self.leased = 1
 
 
     def wire_format(self, unwrap_foreign_keys=True):
@@ -110,8 +129,8 @@
         host_info = super(RDBServerHostWrapper, self).wire_format()
 
         if unwrap_foreign_keys:
-            host_info['labels'] = self.labels.get_all_items()
-            host_info['acls'] = self.acls.get_all_items()
+            host_info['labels'] = self.labels.get_label_names()
+            host_info['acls'] = self.acls
             host_info['platform_name'] = self.platform_name
             host_info['protection'] = self.protection
         return host_info
diff --git a/scheduler/rdb_hosts_unittest.py b/scheduler/rdb_hosts_unittest.py
new file mode 100644
index 0000000..ce57269
--- /dev/null
+++ b/scheduler/rdb_hosts_unittest.py
@@ -0,0 +1,168 @@
+# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import unittest
+
+import common
+from autotest_lib.client.common_lib.test_utils import unittest
+from autotest_lib.frontend import setup_django_environment
+from autotest_lib.frontend.afe import frontend_test_utils
+from autotest_lib.frontend.afe import rdb_model_extensions as rdb_models
+from autotest_lib.scheduler import rdb_hosts
+from autotest_lib.scheduler import rdb_integration_tests
+from autotest_lib.scheduler import rdb_utils
+
+
+class RDBHostTests(unittest.TestCase, frontend_test_utils.FrontendTestMixin):
+    """Unittests for RDBHost objects."""
+
+    def setUp(self):
+        self.db_helper = rdb_integration_tests.DBHelper()
+        self._database = self.db_helper.database
+        # Runs syncdb setting up initial database conditions
+        self._frontend_common_setup()
+
+
+    def tearDown(self):
+        self._database.disconnect()
+        self._frontend_common_teardown()
+
+
+    def testWireFormat(self):
+        """Test that we can create a client host with the server host's fields.
+
+        Get the wire_format fields of an RDBServerHostWrapper and use them to
+        create an RDBClientHostWrapper.
+
+        @raises AssertionError: If the labels and acls don't match up after
+            going through the complete wire_format conversion, of the bare
+            wire_format conversion also converts labels and acls.
+        @raises RDBException: If some critical fields were lost during
+            wire_format conversion, as we won't be able to construct the
+            RDBClientHostWrapper.
+        """
+        labels = set(['a', 'b', 'c'])
+        acls = set(['d', 'e'])
+        server_host = rdb_hosts.RDBServerHostWrapper(
+                self.db_helper.create_host('h1', deps=labels, acls=acls))
+        acl_ids = set([aclgroup.id for aclgroup in
+                   self.db_helper.get_acls(name__in=acls)])
+        label_ids = set([label.id for label in
+                         self.db_helper.get_labels(name__in=labels)])
+
+        # The RDBServerHostWrapper keeps ids of labels/acls to perform
+        # comparison operations within the rdb, but converts labels to
+        # strings because this is the format the scheduler expects them in.
+        self.assertTrue(set(server_host.labels) == label_ids and
+                        set(server_host.acls) == acl_ids)
+
+        formatted_server_host = server_host.wire_format()
+        client_host = rdb_hosts.RDBClientHostWrapper(**formatted_server_host)
+        self.assertTrue(set(client_host.labels) == labels and
+                        set(client_host.acls) == acl_ids)
+        bare_formatted_server_host = server_host.wire_format(
+                unwrap_foreign_keys=False)
+        self.assertTrue(bare_formatted_server_host.get('labels') is None and
+                        bare_formatted_server_host.get('acls') is None)
+
+
+    def testLeasing(self):
+        """Test that leasing a leased host raises an exception.
+
+        @raises AssertionError: If double leasing a host doesn't raise
+            an RDBException, or the leased bits are not set after the
+            first attempt at leasing it.
+        @raises RDBException: If the host is created with the leased bit set.
+        """
+        hostname = 'h1'
+        server_host = rdb_hosts.RDBServerHostWrapper(
+                self.db_helper.create_host(hostname))
+        server_host.lease()
+        host = self.db_helper.get_host(hostname=hostname)[0]
+        self.assertTrue(host.leased and server_host.leased)
+        self.assertRaises(rdb_utils.RDBException, server_host.lease)
+
+
+    def testPlatformAndLabels(self):
+        """Test that a client host returns the right platform and labels.
+
+        @raises AssertionError: If client host cannot return the right platform
+            and labels.
+        """
+        platform_name = 'x86'
+        label_names = ['a', 'b']
+        self.db_helper.create_label(name=platform_name, platform=True)
+        server_host = rdb_hosts.RDBServerHostWrapper(
+                self.db_helper.create_host(
+                        'h1', deps=set(label_names + [platform_name])))
+        client_host = rdb_hosts.RDBClientHostWrapper(
+                **server_host.wire_format())
+        platform, labels = client_host.platform_and_labels()
+        self.assertTrue(platform == platform_name)
+        self.assertTrue(set(labels) == set(label_names))
+
+
+    def testClientUpdateSave(self):
+        """Test that a client host is capable of saving its attributes.
+
+        Create a client host, set its attributes and verify that the attributes
+        are saved properly by recreating a server host and checking them.
+
+        @raises AssertionError: If the server host has the wrong attributes.
+        """
+        hostname = 'h1'
+        db_host = self.db_helper.create_host(hostname, leased=True)
+        server_host_dict = rdb_hosts.RDBServerHostWrapper(db_host).wire_format()
+        client_host = rdb_hosts.RDBClientHostWrapper(**server_host_dict)
+
+        host_data = {'hostname': hostname, 'id': db_host.id}
+        default_values = rdb_models.AbstractHostModel.provide_default_values(
+                host_data)
+        for k, v in default_values.iteritems():
+            self.assertTrue(server_host_dict[k] == v)
+
+        updated_client_fields = {
+                    'locked': True,
+                    'leased': False,
+                    'status': 'FakeStatus',
+                    'invalid': True,
+                    'protection': 1,
+                    'dirty': True,
+                }
+        client_host.__dict__.update(updated_client_fields)
+        client_host.save()
+
+        updated_server_host = rdb_hosts.RDBServerHostWrapper(
+                self.db_helper.get_host(hostname=hostname)[0]).wire_format()
+        for k, v in updated_client_fields.iteritems():
+            self.assertTrue(updated_server_host[k] == v)
+
+
+    def testUpdateField(self):
+        """Test that update field on the client host works as expected.
+
+        @raises AssertionError: If a bad update is processed without an
+            exception, of a good update isn't processed as expected.
+        """
+        hostname = 'h1'
+        db_host = self.db_helper.create_host(hostname, dirty=False)
+        server_host_dict = rdb_hosts.RDBServerHostWrapper(db_host).wire_format()
+        client_host = rdb_hosts.RDBClientHostWrapper(**server_host_dict)
+        self.assertRaises(rdb_utils.RDBException, client_host.update_field,
+                          *('id', 'fakeid'))
+        self.assertRaises(rdb_utils.RDBException, client_host.update_field,
+                          *('Nonexist', 'Nonexist'))
+        client_host.update_field('dirty', True)
+        self.assertTrue(
+                self.db_helper.get_host(hostname=hostname)[0].dirty == True and
+                client_host.dirty == True)
+        new_status = 'newstatus'
+        client_host.set_status(new_status)
+        self.assertTrue(
+                self.db_helper.get_host(hostname=hostname)[0].status ==
+                new_status and client_host.status == new_status)
+
+
+
+
diff --git a/scheduler/rdb_integration_tests.py b/scheduler/rdb_integration_tests.py
index 6f66722..95e8da6 100644
--- a/scheduler/rdb_integration_tests.py
+++ b/scheduler/rdb_integration_tests.py
@@ -8,15 +8,22 @@
 import collections
 
 import common
-from autotest_lib.frontend import setup_django_environment
-from autotest_lib.frontend.afe import frontend_test_utils
+
+from autotest_lib.client.common_lib import host_queue_entry_states
 from autotest_lib.client.common_lib.test_utils import unittest
 from autotest_lib.database import database_connection
+from autotest_lib.frontend import setup_django_environment
+from autotest_lib.frontend.afe import frontend_test_utils
 from autotest_lib.frontend.afe import models
+from autotest_lib.frontend.afe import rdb_model_extensions
 from autotest_lib.scheduler import monitor_db
 from autotest_lib.scheduler import monitor_db_functional_test
 from autotest_lib.scheduler import scheduler_models
-from autotest_lib.scheduler import rdb_lib, rdb_requests
+from autotest_lib.scheduler import rdb
+from autotest_lib.scheduler import rdb_hosts
+from autotest_lib.scheduler import rdb_lib
+from autotest_lib.scheduler import rdb_requests
+
 
 # Set for verbose table creation output.
 _DEBUG = False
@@ -35,9 +42,28 @@
 
 
     @classmethod
-    def create_label(cls, name):
-        label = models.Label.objects.filter(name=name)
-        return models.Label.add_object(name=name) if not label else label[0]
+    def get_labels(cls, **kwargs):
+        """Get a label queryset based on the kwargs."""
+        return models.Label.objects.filter(**kwargs)
+
+
+    @classmethod
+    def get_acls(cls, **kwargs):
+        """Get an aclgroup queryset based on the kwargs."""
+        return models.AclGroup.objects.filter(**kwargs)
+
+
+    @classmethod
+    def get_host(cls, **kwargs):
+        """Get a host queryset based on the kwargs."""
+        return models.Host.objects.filter(**kwargs)
+
+
+    @classmethod
+    def create_label(cls, name, **kwargs):
+        label = cls.get_labels(name=name, **kwargs)
+        return (models.Label.add_object(name=name, **kwargs)
+                if not label else label[0])
 
 
     @classmethod
@@ -55,6 +81,13 @@
 
 
     @classmethod
+    def create_acl_group(cls, name):
+        aclgroup = cls.get_acls(name=name)
+        return (models.AclGroup.add_object(name=name)
+                if not aclgroup else aclgroup[0])
+
+
+    @classmethod
     def add_deps_to_job(cls, job, dep_names=set([])):
         label_objects = set([])
         for label in dep_names:
@@ -63,13 +96,6 @@
 
 
     @classmethod
-    def create_acl_group(cls, name):
-        aclgroup = models.AclGroup.objects.filter(name=name)
-        return (models.AclGroup.add_object(name=name)
-                if not aclgroup else aclgroup[0])
-
-
-    @classmethod
     def add_host_to_aclgroup(cls, host, aclgroup_names=set([])):
         for group_name in aclgroup_names:
             aclgroup = cls.create_acl_group(group_name)
@@ -114,7 +140,24 @@
         # Though we can return the host object above, this proves that the host
         # actually got saved in the database. For example, this will return none if
         # save() wasn't called on the model.Host instance.
-        return models.Host.objects.filter(hostname=name)[0]
+        return cls.get_host(hostname=name)[0]
+
+
+    @classmethod
+    def add_host_to_job(cls, host, job_id):
+        """Add a host to the hqe of a job.
+
+        @param host: An instance of the host model.
+        @param job_id: The job to which we need to add the host.
+
+        @raises ValueError: If the hqe for the job already has a host,
+            or if the host argument isn't a Host instance.
+        """
+        hqe = models.HostQueueEntry.objects.get(job_id=job_id)
+        if hqe.host:
+            raise ValueError('HQE for job %s already has a host' % job_id)
+        hqe.host = host
+        hqe.save()
 
 
     @classmethod
@@ -124,7 +167,7 @@
         job.save()
 
 
-class PriorityAssignmentValidator(object):
+class AssignmentValidator(object):
     """Utility class to check that priority inversion doesn't happen. """
 
 
@@ -156,7 +199,7 @@
         if not hosts or not request:
             return None
         for host in hosts:
-            if PriorityAssignmentValidator.check_acls_deps(host, request):
+            if AssignmentValidator.check_acls_deps(host, request):
                 return host
 
 
@@ -173,6 +216,27 @@
 
 
     @staticmethod
+    def verify_priority(request_queue, result):
+        requests = AssignmentValidator.sort_requests(request_queue)
+        for request, count in requests:
+            hosts = result.get(request)
+            # The request was completely satisfied.
+            if hosts and len(hosts) == count:
+                continue
+            # Go through all hosts given to lower priority requests and
+            # make sure we couldn't have allocated one of them for this
+            # unsatisfied higher priority request.
+            lower_requests = requests[requests.index((request,count))+1:]
+            for lower_request, count in lower_requests:
+                if (lower_request.priority < request.priority and
+                    AssignmentValidator.find_matching_host_for_request(
+                            result.get(lower_request), request)):
+                    raise ValueError('Priority inversion occured between '
+                            'priorities %s and %s' %
+                            (request.priority, lower_request.priority))
+
+
+    @staticmethod
     def priority_checking_response_handler(request_manager):
         """Fake response handler wrapper for any request_manager.
 
@@ -189,26 +253,13 @@
         # could not have been satisfied by hosts assigned to requests lower
         # down in the list.
         result = request_manager.api_call(request_manager.request_queue)
-        requests = PriorityAssignmentValidator.sort_requests(
-                request_manager.request_queue)
-        for request, count in requests:
-            hosts = result.get(request)
-            # The request was completely satisfied.
-            if hosts and len(hosts) == count:
-                continue
-            # Go through all hosts given to lower priority requests and
-            # make sure we couldn't have allocated one of them for this
-            # unsatisfied higher priority request.
-            lower_requests = requests[requests.index((request,count))+1:]
-            for lower_request, count in lower_requests:
-                if (PriorityAssignmentValidator.find_matching_host_for_request(
-                    result.get(lower_request), request)):
-                    raise ValueError('Priority inversion occured between '
-                            'priorities %s and %s' %
-                            (request.priority, lower_request.priority))
-        # Though we've confirmed behavior, the rdb_lib method that is using this
-        # request manager needs to exit cleanly.
-        yield None
+        if not result:
+            raise ValueError('Expected results but got none.')
+        AssignmentValidator.verify_priority(
+                request_manager.request_queue, result)
+        for hosts in result.values():
+            for host in hosts:
+                yield host
 
 
 class BaseRDBTest(unittest.TestCase, frontend_test_utils.FrontendTestMixin):
@@ -238,6 +289,7 @@
 
     def tearDown(self):
         """Teardown the host/job database established through setUp. """
+        self.god.unstub_all()
         self._database.disconnect()
         self._frontend_common_teardown()
 
@@ -261,15 +313,49 @@
         # another label to the rdb.
         if not deps:
             raise ValueError('Need at least one dep for metahost')
-        metahost = DBHelper.create_label(list(deps)[0])
-        job = self._create_job(metahosts=[metahost.id], priority=priority,
-                owner=user)
+
+        # TODO: This is a hack around the fact that frontend_test_utils still
+        # need a metahost, but metahost is treated like any other label.
+        metahost = self.db_helper.create_label(list(deps)[0])
+        job = self._create_job(
+                metahosts=[metahost.id], priority=priority, owner=user)
         self.assert_(len(job.hostqueueentry_set.all()) == 1)
-        DBHelper.add_deps_to_job(job, dep_names=list(deps)[1:])
-        DBHelper.add_user_to_aclgroups(user, aclgroup_names=acls)
+        self.db_helper.add_deps_to_job(job, dep_names=list(deps)[1:])
+        self.db_helper.add_user_to_aclgroups(user, aclgroup_names=acls)
         return models.Job.objects.filter(id=job.id)[0]
 
 
+    def assert_host_db_status(self, host_id):
+        """Assert host state right after acquisition.
+
+        Call this method to check the status of any host leased by the
+        rdb before it has been assigned to an hqe. It must be leased and
+        ready at this point in time.
+
+        @param host_id: Id of the host to check.
+
+        @raises AssertionError: If the host is either not leased or Ready.
+        """
+        host = models.Host.objects.get(id=host_id)
+        self.assert_(host.leased)
+        self.assert_(host.status == 'Ready')
+
+
+    def check_hosts(self, host_iter):
+        """Sanity check all hosts in the host_gen.
+
+        @param host_iter: A generator/iterator of RDBClientHostWrappers.
+            eg: The generator returned by rdb_lib.acquire_hosts. If a request
+            was not satisfied this iterator can contain None.
+
+        @raises AssertionError: If any of the sanity checks fail.
+        """
+        for host in host_iter:
+            if host:
+                self.assert_host_db_status(host.id)
+                self.assert_(host.leased == 1)
+
+
     def check_host_assignment(self, job_id, host_id):
         """Check is a job<->host assignment is valid.
 
@@ -285,7 +371,6 @@
         @param host_id: The id of the host to check for compatibility.
 
         @raises AssertionError: If the job and the host are incompatible.
-            This will happen
         """
         job = models.Job.objects.get(id=job_id)
         host = models.Host.objects.get(id=host_id)
@@ -296,10 +381,7 @@
         all_hqes = models.HostQueueEntry.objects.filter(host_id=host_id, complete=0)
         self.assert_(len(all_hqes) <= 1)
         self.assert_(hqe.host_id == None)
-
-        # Assert basic host status.
-        self.assert_(host.leased)
-        self.assert_(host.status == 'Ready')
+        self.assert_host_db_status(host_id)
 
         # Assert that all deps of the job are satisfied.
         job_deps = set([d.name for d in job.dependency_labels.all()])
@@ -315,6 +397,97 @@
         self.assert_(job_owner_aclgroups.intersection(host_aclgroups))
 
 
+    def testAcquireLeasedHostBasic(self):
+        """Test that acquisition of a leased host doesn't happen.
+
+        @raises AssertionError: If the one host that satisfies the request
+            is acquired.
+        """
+        job = self.create_job(deps=set(['a']))
+        host = self.db_helper.create_host('h1', deps=set(['a']))
+        host.leased = 1
+        host.save()
+        queue_entries = self._dispatcher._refresh_pending_queue_entries()
+        hosts = list(rdb_lib.acquire_hosts(
+            self.host_scheduler, queue_entries))
+        self.assertTrue(len(hosts) == 1 and hosts[0] is None)
+
+
+    def testAcquireLeasedHostRace(self):
+        """Test behaviour when hosts are leased just before acquisition.
+
+        If a fraction of the hosts somehow get leased between finding and
+        acquisition, the rdb should just return the remaining hosts for the
+        request to use.
+
+        @raises AssertionError: If both the requests get a host successfully,
+            since one host gets leased before the final attempt to lease both.
+        """
+        j1 = self.create_job(deps=set(['a']))
+        j2 = self.create_job(deps=set(['a']))
+        hosts = [self.db_helper.create_host('h1', deps=set(['a'])),
+                 self.db_helper.create_host('h2', deps=set(['a']))]
+
+        @rdb_hosts.return_rdb_host
+        def local_find_hosts(host_query_maanger, deps, acls):
+            """Return a predetermined list of hosts, one of which is leased."""
+            h1 = models.Host.objects.get(hostname='h1')
+            h1.leased = 1
+            h1.save()
+            h2 = models.Host.objects.get(hostname='h2')
+            return [h1, h2]
+
+        self.god.stub_with(rdb.AvailableHostQueryManager, 'find_hosts',
+                           local_find_hosts)
+        queue_entries = self._dispatcher._refresh_pending_queue_entries()
+        hosts = list(rdb_lib.acquire_hosts(
+            self.host_scheduler, queue_entries))
+        self.assertTrue(len(hosts) == 2 and None in hosts)
+        self.check_hosts(iter(hosts))
+
+
+    def testHostReleaseStates(self):
+        """Test that we will only release an unused host if it is in Ready.
+
+        @raises AssertionError: If the host gets released in any other state.
+        """
+        host = self.db_helper.create_host('h1', deps=set(['x']))
+        for state in rdb_model_extensions.AbstractHostModel.Status.names:
+            host.status = state
+            host.leased = 1
+            host.save()
+            self._release_unused_hosts()
+            host = models.Host.objects.get(hostname='h1')
+            self.assertTrue(host.leased == (state != 'Ready'))
+
+
+    def testHostReleseHQE(self):
+        """Test that we will not release a ready host if it's being used.
+
+        @raises AssertionError: If the host is released even though it has
+            been assigned to an active hqe.
+        """
+        # Create a host and lease it out in Ready.
+        host = self.db_helper.create_host('h1', deps=set(['x']))
+        host.status = 'Ready'
+        host.leased = 1
+        host.save()
+
+        # Create a job and give its hqe the leased host.
+        job = self.create_job(deps=set(['x']))
+        self.db_helper.add_host_to_job(host, job.id)
+        hqe = models.HostQueueEntry.objects.get(job_id=job.id)
+
+        # Activate the hqe by setting its state.
+        hqe.status = host_queue_entry_states.ACTIVE_STATUSES[0]
+        hqe.save()
+
+        # Make sure the hqes host isn't released, even if its in ready.
+        self._release_unused_hosts()
+        host = models.Host.objects.get(hostname='h1')
+        self.assertTrue(host.leased == 1)
+
+
     def testBasicDepsAcls(self):
         """Test a basic deps/acls request.
 
@@ -326,12 +499,13 @@
         """
         deps = set(['a', 'b'])
         acls = set(['a', 'b'])
-        DBHelper.create_host('h1', deps=deps, acls=acls)
+        self.db_helper.create_host('h1', deps=deps, acls=acls)
         job = self.create_job(user='autotest_system', deps=deps, acls=acls)
         queue_entries = self._dispatcher._refresh_pending_queue_entries()
         matching_host  = rdb_lib.acquire_hosts(
                 self.host_scheduler, queue_entries).next()
         self.check_host_assignment(job.id, matching_host.id)
+        self.assertTrue(matching_host.leased == 1)
 
 
     def testBadDeps(self):
@@ -343,7 +517,7 @@
         host_labels = set(['a'])
         job_deps = set(['b'])
         acls = set(['a', 'b'])
-        DBHelper.create_host('h1', deps=host_labels, acls=acls)
+        self.db_helper.create_host('h1', deps=host_labels, acls=acls)
         job = self.create_job(user='autotest_system', deps=job_deps, acls=acls)
         queue_entries = self._dispatcher._refresh_pending_queue_entries()
         matching_host  = rdb_lib.acquire_hosts(
@@ -360,7 +534,7 @@
         deps = set(['a'])
         host_acls = set(['a'])
         job_acls = set(['b'])
-        DBHelper.create_host('h1', deps=deps, acls=host_acls)
+        self.db_helper.create_host('h1', deps=deps, acls=host_acls)
 
         # Create the job as a new user who is only in the 'b' and 'Everyone'
         # aclgroups. Though there are several hosts in the Everyone group, the
@@ -377,14 +551,14 @@
 
         Schedule 2 jobs with the same deps, acls and user, but different
         priorities, and confirm that the higher priority request gets the host.
-        This confirmation happens through the PriorityAssignmentValidator.
+        This confirmation happens through the AssignmentValidator.
 
         @raises AssertionError: If the un important request gets host h1 instead
             of the important request.
         """
         deps = set(['a', 'b'])
         acls = set(['a', 'b'])
-        DBHelper.create_host('h1', deps=deps, acls=acls)
+        self.db_helper.create_host('h1', deps=deps, acls=acls)
         important_job = self.create_job(user='autotest_system',
                 deps=deps, acls=acls, priority=2)
         un_important_job = self.create_job(user='autotest_system',
@@ -392,8 +566,9 @@
         queue_entries = self._dispatcher._refresh_pending_queue_entries()
 
         self.god.stub_with(rdb_requests.BaseHostRequestManager, 'response',
-                PriorityAssignmentValidator.priority_checking_response_handler)
-        list(rdb_lib.acquire_hosts(self.host_scheduler, queue_entries))
+                AssignmentValidator.priority_checking_response_handler)
+        self.check_hosts(rdb_lib.acquire_hosts(
+            self.host_scheduler, queue_entries))
 
 
     def testPriorityLevels(self):
@@ -408,7 +583,7 @@
         """
         deps = set(['a', 'b'])
         acls = set(['a', 'b'])
-        DBHelper.create_host('h1', deps=deps, acls=acls)
+        self.db_helper.create_host('h1', deps=deps, acls=acls)
 
         # Create jobs that will bucket differently and confirm that jobs in an
         # earlier bucket get a host.
@@ -421,20 +596,88 @@
         queue_entries = self._dispatcher._refresh_pending_queue_entries()
 
         self.god.stub_with(rdb_requests.BaseHostRequestManager, 'response',
-                PriorityAssignmentValidator.priority_checking_response_handler)
-        list(rdb_lib.acquire_hosts(self.host_scheduler, queue_entries))
+                AssignmentValidator.priority_checking_response_handler)
+        self.check_hosts(rdb_lib.acquire_hosts(
+            self.host_scheduler, queue_entries))
 
         # Elevate the priority of the unimportant job, so we now have
         # 2 jobs at the same priority.
         self.db_helper.increment_priority(job_id=unimportant_job.id)
         queue_entries = self._dispatcher._refresh_pending_queue_entries()
         self._release_unused_hosts()
-        list(rdb_lib.acquire_hosts(self.host_scheduler, queue_entries))
+        self.check_hosts(rdb_lib.acquire_hosts(
+            self.host_scheduler, queue_entries))
 
         # Prioritize the first job, and confirm that it gets the host over the
         # jobs that got it the last time.
         self.db_helper.increment_priority(job_id=unimportant_job.id)
         queue_entries = self._dispatcher._refresh_pending_queue_entries()
         self._release_unused_hosts()
-        list(rdb_lib.acquire_hosts(self.host_scheduler, queue_entries))
+        self.check_hosts(rdb_lib.acquire_hosts(
+            self.host_scheduler, queue_entries))
+
+
+    def testFrontendJobScheduling(self):
+        """Test that basic frontend job scheduling.
+
+        @raises AssertionError: If the received and requested host don't match,
+            or the mis-matching host is returned instead.
+        """
+        deps = set(['x', 'y'])
+        acls = set(['a', 'b'])
+
+        # Create 2 frontend jobs and only one matching host.
+        matching_job = self.create_job(acls=acls, deps=deps)
+        matching_host = self.db_helper.create_host('h1', acls=acls, deps=deps)
+        mis_matching_job = self.create_job(acls=acls, deps=deps)
+        mis_matching_host = self.db_helper.create_host(
+                'h2', acls=acls, deps=deps.pop())
+        self.db_helper.add_host_to_job(matching_host, matching_job.id)
+        self.db_helper.add_host_to_job(mis_matching_host, mis_matching_job.id)
+
+        # Check that only the matching host is returned, and that we get 'None'
+        # for the second request.
+        queue_entries = self._dispatcher._refresh_pending_queue_entries()
+        hosts = list(rdb_lib.acquire_hosts(self.host_scheduler, queue_entries))
+        self.assertTrue(len(hosts) == 2 and None in hosts)
+        returned_host = [host for host in hosts if host].pop()
+        self.assertTrue(matching_host.id == returned_host.id)
+
+
+    def testFrontendJobPriority(self):
+        """Test that frontend job scheduling doesn't ignore priorities.
+
+        @raises ValueError: If the priorities of frontend jobs are ignored.
+        """
+        board = 'x'
+        high_priority = self.create_job(priority=2, deps=set([board]))
+        low_priority = self.create_job(priority=1, deps=set([board]))
+        host = self.db_helper.create_host('h1', deps=set([board]))
+        self.db_helper.add_host_to_job(host, low_priority.id)
+        self.db_helper.add_host_to_job(host, high_priority.id)
+
+        queue_entries = self._dispatcher._refresh_pending_queue_entries()
+
+        def local_response_handler(request_manager):
+            """Confirms that a higher priority frontend job gets a host.
+
+            @raises ValueError: If priority inversion happens and the job
+                with priority 1 gets the host instead.
+            """
+            result = request_manager.api_call(request_manager.request_queue)
+            if not result:
+                raise ValueError('Excepted the high priority request to '
+                                 'get a host, but the result is empty.')
+            for request, hosts in result.iteritems():
+                if request.priority == 1:
+                    raise ValueError('Priority of frontend job ignored.')
+                if len(hosts) > 1:
+                    raise ValueError('Multiple hosts returned against one '
+                                     'frontend job scheduling request.')
+                yield hosts[0]
+
+        self.god.stub_with(rdb_requests.BaseHostRequestManager, 'response',
+                           local_response_handler)
+        self.check_hosts(rdb_lib.acquire_hosts(
+            self.host_scheduler, queue_entries))
 
diff --git a/scheduler/rdb_unittest.py b/scheduler/rdb_unittest.py
new file mode 100644
index 0000000..49cbf8b
--- /dev/null
+++ b/scheduler/rdb_unittest.py
@@ -0,0 +1,235 @@
+# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import mock
+import unittest
+
+import common
+from autotest_lib.frontend import setup_django_lite_environment
+from autotest_lib.frontend.afe import frontend_test_utils
+from autotest_lib.frontend.afe import rdb_model_extensions as rdb_models
+from autotest_lib.scheduler import rdb_integration_tests
+from autotest_lib.scheduler import rdb
+from autotest_lib.scheduler import rdb_hosts
+from autotest_lib.scheduler import rdb_requests
+from autotest_lib.scheduler import rdb_utils
+
+
+from django.core import exceptions as django_exceptions
+from django.db.models import fields
+
+
+class FakeHost(rdb_hosts.RDBHost):
+    """Fake host to use in unittests."""
+
+    def __init__(self, hostname, host_id, **kwargs):
+        kwargs.update({'hostname': hostname, 'id': host_id})
+        kwargs = rdb_models.AbstractHostModel.provide_default_values(
+                kwargs)
+        super(FakeHost, self).__init__(**kwargs)
+
+
+class RDBBaseRequestHandlerTests(unittest.TestCase):
+    """Base Request Handler Unittests."""
+
+    def setUp(self):
+        self.handler = rdb.BaseHostRequestHandler()
+        self.handler.host_query_manager = mock.MagicMock()
+        self.update_manager = rdb_requests.BaseHostRequestManager(
+                rdb_requests.UpdateHostRequest, rdb.update_hosts)
+        self.get_hosts_manager = rdb_requests.BaseHostRequestManager(
+                rdb_requests.HostRequest, rdb.get_hosts)
+
+
+    def tearDown(self):
+        self.handler.host_query_manager.reset_mock()
+
+
+    def testResponseMapUpdate(self):
+        """Test response map behaviour.
+
+        Test that either adding an empty response against a request, or 2
+        responses for the same request will raise an exception.
+        """
+        self.get_hosts_manager.add_request(host_id=1)
+        request = self.get_hosts_manager.request_queue[0]
+        response = []
+        self.assertRaises(
+                rdb_utils.RDBException, self.handler.update_response_map,
+                *(request, response))
+        response.append(FakeHost(hostname='host', host_id=1))
+        self.handler.update_response_map(request, response)
+        self.assertRaises(
+                rdb_utils.RDBException, self.handler.update_response_map,
+                *(request, response))
+
+
+    def testBatchGetHosts(self):
+        """Test getting hosts.
+
+        Verify that:
+            1. We actually call get_hosts on the query_manager for a
+                batched_get_hosts request.
+            2. The hosts returned are matched up correctly with requests,
+                and each request gets exactly one response.
+            3. The hosts returned have all the fields needed to create an
+                RDBClientHostWrapper, in spite of having gone through the
+                to_wire process of serialization in get_response.
+        """
+        fake_hosts = []
+        for host_id in range(1, 4):
+            self.get_hosts_manager.add_request(host_id=host_id)
+            fake_hosts.append(FakeHost('host%s'%host_id, host_id))
+        self.handler.host_query_manager.get_hosts = mock.MagicMock(
+                return_value=fake_hosts)
+        self.handler.batch_get_hosts(self.get_hosts_manager.request_queue)
+        for request, hosts in self.handler.get_response().iteritems():
+            self.assertTrue(len(hosts) == 1)
+            client_host = rdb_hosts.RDBClientHostWrapper(**hosts[0])
+            self.assertTrue(request.host_id == client_host.id)
+
+
+    def testSingleUpdateRequest(self):
+        """Test that a single host update request hits the query manager."""
+        payload = {'status': 'Ready'}
+        host_id = 10
+        self.update_manager.add_request(host_id=host_id, payload=payload)
+        self.handler.update_hosts(self.update_manager.request_queue)
+        self.handler.host_query_manager.update_hosts.assert_called_once_with(
+                [host_id], **payload)
+
+
+    def testDedupingSameHostRequests(self):
+        """Test same host 2 updates deduping."""
+        payload_1 = {'status': 'Ready'}
+        payload_2 = {'locked': True}
+        host_id = 10
+        self.update_manager.add_request(host_id=host_id, payload=payload_1)
+        self.update_manager.add_request(host_id=host_id, payload=payload_2)
+        self.handler.update_hosts(self.update_manager.request_queue)
+        self.handler.host_query_manager.update_hosts.assert_called_once_with(
+                [host_id], **dict(payload_1.items() + payload_2.items()))
+
+
+    def testLastUpdateWins(self):
+        """Test 2 updates to the same row x column."""
+        payload_1 = {'status': 'foobar'}
+        payload_2 = {'status': 'Ready'}
+        host_id = 10
+        self.update_manager.add_request(host_id=host_id, payload=payload_1)
+        self.update_manager.add_request(host_id=host_id, payload=payload_2)
+        self.handler.update_hosts(self.update_manager.request_queue)
+        self.handler.host_query_manager.update_hosts.assert_called_once_with(
+                [host_id], **payload_2)
+
+
+    def testDedupingSamePayloadRequests(self):
+        """Test same payload for 2 hosts only hits the db once."""
+        payload = {'status': 'Ready'}
+        host_1_id = 10
+        host_2_id = 20
+        self.update_manager.add_request(host_id=host_1_id, payload=payload)
+        self.update_manager.add_request(host_id=host_2_id, payload=payload)
+        self.handler.update_hosts(self.update_manager.request_queue)
+        self.handler.host_query_manager.update_hosts.assert_called_once_with(
+                [host_1_id, host_2_id], **payload)
+
+
+    def testUpdateException(self):
+        """Test update exception handling.
+
+        1. An exception raised while processing one update shouldn't prevent
+            the others.
+        2. The exception shold get serialized as a string and returned via the
+            response map.
+        """
+        payload = {'status': 'Ready'}
+        exception_msg = 'Bad Field'
+        exception_types = [django_exceptions.FieldError,
+                           fields.FieldDoesNotExist]
+        self.update_manager.add_request(host_id=11, payload=payload)
+        self.update_manager.add_request(host_id=10, payload=payload)
+        mock_query_manager = self.handler.host_query_manager
+
+        for e, request in zip(
+                exception_types, self.update_manager.request_queue):
+            mock_query_manager.update_hosts.side_effect = e(exception_msg)
+            self.handler.update_hosts([request])
+
+        response = self.handler.get_response()
+        for request in self.update_manager.request_queue:
+            self.assertTrue(exception_msg in response.get(request))
+
+
+class QueryManagerTests(unittest.TestCase,
+                        frontend_test_utils.FrontendTestMixin):
+    """Query Manager Tests."""
+
+    def setUp(self):
+        self.db_helper = rdb_integration_tests.DBHelper()
+        self._database = self.db_helper.database
+
+        # Runs syncdb setting up initial database conditions
+        self._frontend_common_setup()
+        self.available_hosts_query_manager = rdb.AvailableHostQueryManager()
+        self.all_hosts_query_manager = rdb.BaseHostQueryManager()
+
+
+    def tearDown(self):
+        self._database.disconnect()
+        self._frontend_common_teardown()
+
+
+    def testFindHosts(self):
+        """Test finding hosts.
+
+        Tests that we can only find unleased hosts through the
+        available_hosts_query_manager.
+        """
+        deps = set(['a', 'b'])
+        acls = set(['a'])
+        db_host = self.db_helper.create_host(
+                name='h1', deps=deps, acls=acls, leased=1)
+        hosts = self.all_hosts_query_manager.find_hosts(
+                deps=[lable.id for lable in db_host.labels.all()],
+                acls=[aclgroup.id for aclgroup in db_host.aclgroup_set.all()])
+        self.assertTrue(type(hosts) == list and len(hosts) == 1)
+        hosts = self.available_hosts_query_manager.find_hosts(
+                deps=[lable.id for lable in db_host.labels.all()],
+                acls=[aclgroup.id for aclgroup in db_host.aclgroup_set.all()])
+        # We should get an empty list if there are no matching hosts, not a
+        # QuerySet or None.
+        self.assertTrue(len(hosts) == 0)
+
+
+    def testUpdateHosts(self):
+        """Test updating hosts.
+
+        Test that we can only update unleased hosts through the
+        available_hosts_query_manager.
+        """
+        deps = set(['a', 'b'])
+        acls = set(['a'])
+        db_host = self.db_helper.create_host(
+                name='h1', deps=deps, acls=acls, leased=1)
+        # Confirm that the available_hosts_manager can't see the leased host.
+        self.assertTrue(
+                len(self.available_hosts_query_manager.get_hosts(
+                        [db_host.id])) == 0)
+
+        # Confirm that the available_hosts_manager can't update a leased host.
+        # Also confirm that the general query manager Can see the leased host.
+        self.available_hosts_query_manager.update_hosts(
+                [db_host.id], **{'leased': 0})
+        hosts = self.all_hosts_query_manager.get_hosts([db_host.id])
+        self.assertTrue(len(hosts) == 1 and hosts[0].leased)
+
+        # Confirm that we can infact update the leased bit on the host.
+        self.all_hosts_query_manager.update_hosts(
+                [hosts[0].id], **{'leased': 0})
+        hosts = self.all_hosts_query_manager.get_hosts([hosts[0].id])
+        self.assertTrue(len(hosts) == 1 and not hosts[0].leased)
+
+
+
diff --git a/scheduler/rdb_utils.py b/scheduler/rdb_utils.py
index 9373341..de83d3d 100644
--- a/scheduler/rdb_utils.py
+++ b/scheduler/rdb_utils.py
@@ -6,7 +6,6 @@
 
 Do not import rdb or autotest modules here to avoid cyclic dependencies.
 """
-import itertools
 
 
 class RDBException(Exception):
@@ -18,55 +17,27 @@
         return str(self)
 
 
-# Custom iterators: Used by the rdb to lazily convert the iteration of a
-# queryset to a database query and return an appropriately formatted response.
-class RememberingIterator(object):
-    """An iterator capable of reproducing all values in the input generator.
-    """
-
-    #pylint: disable-msg=C0111
-    def __init__(self, gen):
-        self.current, self.history = itertools.tee(gen)
-        self.items = []
-
-
-    def __iter__(self):
-        return self
-
-
-    def next(self):
-        return self.current.next()
-
-
-    def get_all_items(self):
-        """Get all the items in the generator this object was created with.
-
-        @return: A list of items.
-        """
-        if not self.items:
-            self.items = list(self.history)
-        return self.items
-
-
-class LabelIterator(RememberingIterator):
-    """A RememberingIterator for labels.
+class LabelIterator(object):
+    """An Iterator for labels.
 
     Within the rdb any label/dependency comparisons are performed based on label
     ids. However, the host object returned needs to contain label names instead.
-    This class returns the label id when iterated over, but a list of all label
-    names when accessed through get_all_items.
+    This class returns label ids for iteration, but a list of all label names
+    when accessed through get_label_names.
     """
 
-
-    def next(self):
-        return super(LabelIterator, self).next().id
+    def __init__(self, labels):
+        self.labels = labels
 
 
-    def get_all_items(self):
-        """Get all label names of the labels in the input generator.
+    def __iter__(self):
+        return iter(label.id for label in self.labels)
+
+
+    def get_label_names(self):
+        """Get all label names of the labels associated with this class.
 
         @return: A list of label names.
         """
-        return [label.name
-                for label in super(LabelIterator, self).get_all_items()]
+        return [label.name for label in self.labels]
 
diff --git a/utils/unittest_suite.py b/utils/unittest_suite.py
index e771a64..180fae5 100755
--- a/utils/unittest_suite.py
+++ b/utils/unittest_suite.py
@@ -43,6 +43,8 @@
         'execution_engine_unittest.py',
         'service_proxy_lib_test.py',
         'rdb_integration_tests.py',
+        'rdb_unittest.py',
+        'rdb_hosts_unittest.py',
         ))
 
 REQUIRES_MYSQLDB = set((