[autotest] RDB Refactor II + Request/Response API.

Scheduler Refactor:
1. Batched processing of jobs.
2. Rdb hits the database instead of going through host_scheduler.
3. Migration to add a leased column.The scheduler released hosts
    every tick, back to the rdb.
4. Client rdb host that queue_entries use to track a host, instead
    of a database model.

Establishes a basic request/response api for the rdb:
rdb_utils:
    1. Requests: Assert the format and fields of some basic request types.
    2. Helper client/server modules to communicate with the rdb.
rdb_lib:
    1. Request managers for rdb methods:
        a. Match request-response
        b. Abstract the batching of requests.
    2. JobQueryManager: Regulates database access for job information.
rdb:
    1. QueryManagers: Regulate database access
    2. RequestHandlers: Use query managers to get things done.
    3. Dispatchers: Send incoming requests to the appropriate handlers.
Ignores wire formats.

TEST=unittests, functional verification.
BUG=chromium:314081, chromium:314083, chromium:314084
DEPLOY=scheduler, migrate

Change-Id: Id174c663c6e78295d365142751053eae4023116d
Reviewed-on: https://chromium-review.googlesource.com/183385
Reviewed-by: Prashanth B <beeps@chromium.org>
Commit-Queue: Prashanth B <beeps@chromium.org>
Tested-by: Prashanth B <beeps@chromium.org>
diff --git a/scheduler/agent_task.py b/scheduler/agent_task.py
index 7cd28f6..87ec371 100644
--- a/scheduler/agent_task.py
+++ b/scheduler/agent_task.py
@@ -116,6 +116,7 @@
 from autotest_lib.scheduler import drone_manager, pidfile_monitor
 from autotest_lib.client.common_lib import utils
 from autotest_lib.scheduler import email_manager, host_scheduler
+from autotest_lib.scheduler import rdb_lib
 from autotest_lib.scheduler import scheduler_models
 from autotest_lib.server import autoserv_utils
 
@@ -513,11 +514,13 @@
 
         assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
 
-        self.host = scheduler_models.Host(id=task.host.id)
+        self.host = rdb_lib.get_hosts([task.host.id])[0]
+        self.host.dbg_str = 'Task: %s' % str(task)
         self.queue_entry = None
         if task.queue_entry:
             self.queue_entry = scheduler_models.HostQueueEntry(
                     id=task.queue_entry.id)
+            self.host.dbg_str += self.queue_entry.get_dbg_str()
 
         self.task = task
         self._extra_command_args = extra_command_args
diff --git a/scheduler/host_scheduler.py b/scheduler/host_scheduler.py
index 7fe4e4e..6d933a6 100644
--- a/scheduler/host_scheduler.py
+++ b/scheduler/host_scheduler.py
@@ -1,5 +1,4 @@
-"""
-Autotest client module for the rdb.
+"""Autotest host scheduler.
 """
 
 
@@ -8,7 +7,7 @@
 from autotest_lib.client.common_lib import global_config, utils
 from autotest_lib.frontend.afe import models
 from autotest_lib.scheduler import metahost_scheduler, scheduler_config
-from autotest_lib.scheduler import rdb, scheduler_models
+from autotest_lib.scheduler import scheduler_models
 from autotest_lib.site_utils.graphite import stats
 from autotest_lib.server.cros import provision
 
@@ -59,7 +58,12 @@
 
 
     @_timer.decorate
-    def _get_ready_hosts(self):
+    def _release_hosts(self):
+        """Release hosts to the RDB.
+
+        Release all hosts that are ready and are currently not being used by an
+        active hqe, and don't have a new special task scheduled against them.
+        """
         # Avoid any host with a currently active queue entry against it.
         hqe_join = ('LEFT JOIN afe_host_queue_entries AS active_hqe '
                     'ON (afe_hosts.id = active_hqe.host_id AND '
@@ -79,10 +83,27 @@
         hosts = scheduler_models.Host.fetch(
             joins='%s %s' % (hqe_join, special_task_join),
             where="active_hqe.host_id IS NULL AND new_tasks.host_id IS NULL "
+                  "AND afe_hosts.leased "
                   "AND NOT afe_hosts.locked "
                   "AND (afe_hosts.status IS NULL "
                           "OR afe_hosts.status = 'Ready')")
 
+        for ready_host in hosts:
+            ready_host.update_field('leased', 0)
+
+
+    @_timer.decorate
+    def _get_ready_hosts(self):
+        # We don't lose anything by re-doing these checks
+        # even though we release hosts on the same conditions.
+        # In the future we might have multiple clients that
+        # release_hosts and/or lock them independent of the
+        # scheduler tick.
+        hosts = scheduler_models.Host.fetch(
+            where="NOT afe_hosts.leased "
+                  "AND NOT afe_hosts.locked "
+                  "AND (afe_hosts.status IS NULL "
+                      "OR afe_hosts.status = 'Ready')")
         return dict((host.id, host) for host in hosts)
 
 
@@ -221,8 +242,7 @@
 
     @_timer.decorate
     def tick(self):
-        for metahost_scheduler in self._metahost_schedulers:
-            metahost_scheduler.tick()
+        self._release_hosts()
 
 
     def hosts_in_label(self, label_id):
@@ -375,100 +395,6 @@
         return True
 
 
-    def get_job_info(self, queue_entry):
-        """
-        Extract job information from a queue_entry/host-scheduler.
-
-        Unfortunately the information needed to choose hosts for a job
-        are split across several tables and not restricted to just the
-        hqe. At the very least we require the deps and acls of the job, but
-        we also need to know if the job has a host assigned to it. This method
-        consolidates this information into a light-weight dictionary that the
-        host_scheduler and the rdb can pass back and forth.
-
-        @param queue_entry: the queue_entry of the job we would like
-            information about.
-
-        @return: A dictionary containing 1. A set of deps 2. A set of acls
-            3. The host id of the host assigned to the hqe, or None.
-        """
-        job_id = queue_entry.job_id
-        host_id = queue_entry.host_id
-        job_deps = self._job_dependencies.get(job_id, set())
-        job_deps = set([dep for dep in job_deps if
-                       not provision.can_provision(self._labels[dep].name)])
-        job_acls = self._job_acls.get(job_id, set())
-
-        return {'deps': set(job_deps),
-                'acls': set(job_acls),
-                'host_id': host_id}
-
-
-    def schedule_entry(self, queue_entry):
-        """
-        Schedule a hqe aginst a host.
-
-        A hqe can either have a host assigned to it or not. In eithercase
-        however, actually scheduling the hqe on the host involves validating
-        the assignment by checking acls and labels. If the hqe doesn't have a
-        host we need to find a host before we can perform this validation.
-
-        If we successfully validate the host->hqe pairing, return the host. The
-        scheduler will not begin scheduling special tasks for the hqe until it
-        acquires a valid host.
-
-        @param queue_entry: The queue_entry that requires a host.
-        @return: The host assigned to the hqe, if any.
-        """
-        host_id = queue_entry.host_id
-        job_id = queue_entry.job_id
-        job_info = self.get_job_info(queue_entry)
-        host = None
-
-        if host_id:
-            host = self._hosts_available.get(host_id, None)
-
-            # TODO(beeps): Remove the need for 2 rdb calls. Ideally we should
-            # just do one call to validate the assignment, however, since we're
-            # currently still using the host_scheduler, we'd need to pass it
-            # as an argument to validate_host_assignment, which is less clean
-            # than just splitting this work into 2 calls.
-            host_info = rdb.get_host_info(self, host_id)
-
-            # If the host is either unavailable or in-eligible for this job,
-            # defer scheduling this queue_entry till the next tick.
-            if (host is None or not
-                rdb.validate_host_assignment(job_info, host_info)):
-                return None
-        else:
-            host = rdb.get_host(self, job_info)
-            if host is None:
-                return None
-            queue_entry.set_host(host)
-
-        # TODO(beeps): Make it so we don't need to set the hqe active status
-        # to remove a host from the active pool.
-        # A host will remain in the available pool for as long as its status
-        # is Ready and it is not referenced by an active hqe. The state of
-        # the host is not under our control, as it will only change to
-        # resetting etc whenever the prejob task starts. However, the hqe
-        # is theoretically active from the moment we assign a healthy host
-        # to it. Setting the host on an inactive hqe will not remove it
-        # from the available pool, leading to unnecessary scheduling
-        # overhead.
-        # Without this, we will process each hqe twice because it is still
-        # judged as 'new', and perform the host<->hqe assignment twice,
-        # because the host assigned to the hqe is still 'available', as
-        # the first prejob task only runs at the end of the next tick's
-        # handle_agents call. Note that the status is still 'Queued', and
-        # will remaing 'Queued' till an agent changes it.
-        queue_entry.update_field('active', True)
-
-        # The available_hosts dictionary determines our scheduling decisions
-        # for subsequent jobs processed in this tick.
-        self._hosts_available.pop(host.id)
-        logging.debug('Scheduling job: %s, Host %s', job_id, host.id)
-        return host
 
 
     def find_eligible_atomic_group(self, queue_entry):
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index 3cacbfa..4c320e7 100755
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -22,6 +22,8 @@
 from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
 from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
 from autotest_lib.scheduler import postjob_task, scheduler_logging_config
+from autotest_lib.scheduler import rdb_lib
+from autotest_lib.scheduler import rdb_utils
 from autotest_lib.scheduler import scheduler_models
 from autotest_lib.scheduler import status_server, scheduler_config
 from autotest_lib.server import autoserv_utils
@@ -817,10 +819,49 @@
 
 
     def _schedule_hostless_job(self, queue_entry):
+        """Schedule a hostless (suite) job.
+
+        @param queue_entry: The queue_entry representing the hostless job.
+        """
         self.add_agent_task(HostlessQueueTask(queue_entry))
         queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
 
 
+    def _schedule_host_job(self, host, queue_entry):
+        """Schedules a job on the given host.
+
+        1. Assign the host to the hqe, if it isn't already assigned.
+        2. Create a SpecialAgentTask for the hqe.
+        3. Activate the hqe.
+
+        @param queue_entry: The job to schedule.
+        @param host: The host to schedule the job on.
+        """
+        if self.host_has_agent(host):
+            host_agent_task = list(self._host_agents.get(host.id))[0].task
+            subject = 'Host with agents assigned to an HQE'
+            message = ('HQE: %s assigned host %s, but the host has '
+                       'agent: %s for queue_entry %s. The HQE '
+                       'will remain in a queued state till the '
+                       'the host is usable.' %
+                       (queue_entry, host.hostname, host_agent_task,
+                        host_agent_task.queue_entry))
+            email_manager.manager.enqueue_notify_email(subject, message)
+            queue_entry.set_host(None)
+            queue_entry.update_field('active', False)
+        else:
+            if queue_entry.host_id is None:
+                queue_entry.set_host(host)
+            else:
+                if host.id != queue_entry.host_id:
+                    raise rdb_utils.RDBException('The rdb returned host: %s '
+                            'but the job:%s was already assigned a host: %s. ' %
+                            (host.hostname, queue_entry.job_id,
+                            queue_entry.host.hostname))
+            queue_entry.update_field('active', True)
+            self._run_queue_entry(queue_entry)
+
+
     def _schedule_new_jobs(self):
         """
         Find any new HQEs and call schedule_pre_job_tasks for it.
@@ -833,55 +874,31 @@
         """
         queue_entries = self._refresh_pending_queue_entries()
 
+        key = 'scheduler.jobs_per_tick'
         new_hostless_jobs = 0
-        new_atomic_groups = 0
         new_jobs_with_hosts = 0
         new_jobs_need_hosts = 0
-
+        host_jobs = []
         logging.debug('Processing %d queue_entries', len(queue_entries))
-        for queue_entry in queue_entries:
-            self._log_extra_msg('Processing queue_entry: %s' % queue_entry)
-            is_unassigned_atomic_group = (
-                    queue_entry.atomic_group_id is not None
-                    and queue_entry.host_id is None)
 
+        for queue_entry in queue_entries:
             if queue_entry.is_hostless():
-                self._log_extra_msg('Scheduling hostless job.')
                 self._schedule_hostless_job(queue_entry)
                 new_hostless_jobs = new_hostless_jobs + 1
-            elif is_unassigned_atomic_group:
-                self._schedule_atomic_group(queue_entry)
-                new_atmoic_groups = new_atomic_groups + 1
             else:
+                host_jobs.append(queue_entry)
                 new_jobs_need_hosts = new_jobs_need_hosts + 1
-                assigned_host = self._host_scheduler.schedule_entry(queue_entry)
-                if assigned_host:
-                    # If we ever find ourselves in a position where a ready host
-                    # has an agent, roll back the host assignment and try again
-                    # next tick.
-                    if self.host_has_agent(assigned_host):
-                        host_agent_task = [host_agent.task for host_agent in
-                                           list(self._host_agents.get(
-                                                       assigned_host.id))][0]
-                        subject = 'Host with agents assigned to an HQE'
-                        message = ('HQE: %s assigned host %s, but the host has '
-                                   'agent: %s for queue_entry %s. The HQE '
-                                   'will remain in a queued state till the '
-                                   'the host is usable.' %
-                                   (queue_entry, assigned_host.hostname,
-                                    host_agent_task,
-                                    host_agent_task.queue_entry))
-                        email_manager.manager.enqueue_notify_email(subject, message)
-                        queue_entry.set_host(None)
-                        queue_entry.update_field('active', False)
-                    else:
-                        assert assigned_host.id == queue_entry.host_id
-                        self._run_queue_entry(queue_entry)
-                        new_jobs_with_hosts = new_jobs_with_hosts + 1
 
-        key = 'scheduler.jobs_per_tick'
         stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)
-        stats.Gauge(key).send('new_atomic_groups', new_atomic_groups)
+        if not host_jobs:
+            return
+
+        hosts = rdb_lib.acquire_hosts(self._host_scheduler, host_jobs)
+        for host, queue_entry in zip(hosts, host_jobs):
+            if host:
+                self._schedule_host_job(host, queue_entry)
+                new_jobs_with_hosts = new_jobs_with_hosts + 1
+
         stats.Gauge(key).send('new_jobs_with_hosts', new_jobs_with_hosts)
         stats.Gauge(key).send('new_jobs_without_hosts',
                               new_jobs_need_hosts - new_jobs_with_hosts)
diff --git a/scheduler/monitor_db_functional_test.py b/scheduler/monitor_db_functional_test.py
index d5ace50..6efb58b 100755
--- a/scheduler/monitor_db_functional_test.py
+++ b/scheduler/monitor_db_functional_test.py
@@ -20,6 +20,8 @@
         # it arises in an important query
         _re_translator(r'GROUP_CONCAT\((.*?)\)', r'\1'),
         _re_translator(r'TRUNCATE TABLE', 'DELETE FROM'),
+        _re_translator(r'ISNULL\(([a-z,_]+)\)',
+                       r'ifnull(nullif(\1, NULL), \1) DESC'),
 )
 
 HqeStatus = models.HostQueueEntry.Status
diff --git a/scheduler/monitor_db_unittest.py b/scheduler/monitor_db_unittest.py
index 42ecf7d..3ca8f50 100755
--- a/scheduler/monitor_db_unittest.py
+++ b/scheduler/monitor_db_unittest.py
@@ -221,6 +221,7 @@
 
 
     def _run_scheduler(self):
+        self._dispatcher._host_scheduler.tick()
         for _ in xrange(2): # metahost scheduling can take two cycles
             self._dispatcher._schedule_new_jobs()
 
diff --git a/scheduler/rdb.py b/scheduler/rdb.py
index 8dc3bcd..a0c5250 100644
--- a/scheduler/rdb.py
+++ b/scheduler/rdb.py
@@ -1,111 +1,328 @@
+"""Rdb server module.
 """
-Rdb server module.
-"""
+
+import collections
 import logging
+
+import common
+
+from django.core import exceptions as django_exceptions
+from django.db.models import fields
+from django.db.models import Q
+from autotest_lib.frontend.afe import models
+from autotest_lib.scheduler import rdb_utils
 from autotest_lib.site_utils.graphite import stats
 
 
 _timer = stats.Timer('rdb')
 
-def _check_acls(job_acls, host_acls):
-    if job_acls is None or host_acls is None:
-        return False
-    return len(host_acls.intersection(job_acls))
 
-
-def _check_deps(job_deps, host_labels):
-    if job_deps is None or host_labels is None:
-        return False
-    return len(job_deps - host_labels) == 0
-
-
-def validate_host_assignment(job_info, host_info):
-    """ Validate this job<->host pairing.
-
-    @param job_info: Information about the job as determined by
-                     the client rdb module.
-    @param host_info: Information about the host as determined by
-                      get_host_info.
-
-    @return: True if the job<->host pairing is valid, False otherwise.
-             False, if we don't have enough information to make a decision.
-    """
-    one_time_host = host_info.get('invalid') and job_info.get('host_id')
-
-    return (_check_acls(job_info.get('acls'), host_info.get('acls')) and
-            _check_deps(job_info.get('deps'), host_info.get('labels')) and
-            not host_info.get('invalid', True) or one_time_host and
-            not host_info.get('locked', True))
-
-
-def get_host_info(host_scheduler, host_id):
-    """
-    Utility method to parse information about a host into a dictionary.
-
-    Ideally this can just return the Host object, but doing this has the
-    following advantages:
-        1. Changes to the schema will only require changes to this method.
-        2. We can reimplement this method to make use of django caching.
-        3. We can lock rows of the host table in a centralized location.
-
-    @param host_id: id of the host to get information about.
-    @return: A dictionary containing all information needed to make a
-             scheduling decision regarding this host.
-    """
-    acls = host_scheduler._host_acls.get(host_id, set())
-    labels = host_scheduler._host_labels.get(host_id, set())
-    host_info = {'labels': labels, 'acls': acls}
-    host = host_scheduler._hosts_available.get(host_id)
-    if host:
-        host_info.update({'locked': host.locked, 'invalid': host.invalid})
-    return host_info
-
-
-def _order_labels(host_scheduler, labels):
-    """Given a list of labels, order them by available host count.
-
-    To make a scheduling decision, we need a host that matches all dependencies
-    of a job, hence the most restrictive search space we can use is the list
-    of ready hosts that have the least frequent label.
-
-    @param labels: A list of labels. If no hosts are available in a label,
-                   it will be the first in this list.
-    """
-    label_count = [len(host_scheduler._label_hosts.get(label, []))
-                   for label in labels]
-    return [label_tuple[1] for label_tuple in sorted(zip(label_count, labels))]
-
-
-@_timer.decorate
-def get_host(host_scheduler, job_info):
-    """
-    Get a host matching the job's selection criterion.
-
-    - Get all hosts in rarest label.
-    - Check which ones are still usable.
-    - Return the first of these hosts that passes our validity checks.
-
-    @param job_info: A dictionary of job information needed to pick a host.
-
-    @return: A host object from the available_hosts map.
+# Qeury managers: Provide a layer of abstraction over the database by
+# encapsulating common query patterns used by the rdb.
+class BaseHostQueryManager(object):
+    """Base manager for host queries on all hosts.
     """
 
-    # A job must at least have one dependency (eg:'board:') in order for us to
-    # find a host for it. To do so we use 2 data structures of host_scheduler:
-    # - label to hosts map: to count label frequencies, and get hosts in a label
-    # - hosts_available map: to mark a host as used, as it would be difficult
-    #   to delete this host from all the label keys it has, in the label to
-    #   hosts map.
-    rarest_label = _order_labels(host_scheduler, job_info.get('deps'))[0]
+    host_objects = models.Host.objects
 
-    # TODO(beeps): Once we have implemented locking in afe_hosts make this:
-    # afe.models.Host.object.filter(locked).filter(acls).filter(labels)...
-    # where labels are chained according to frequency. Currently this will
-    # require a join across all hqes which could be expensive, and is
-    # unnecessary anyway since we need to move away from this scheduling model.
-    hosts_considered = host_scheduler._label_hosts.get(rarest_label, [])
-    for host_id in hosts_considered:
-        host = host_scheduler._hosts_available.get(host_id)
-        host_info = get_host_info(host_scheduler, host_id)
-        if host and validate_host_assignment(job_info, host_info):
-            return host
+
+    def update_hosts(self, host_ids, **kwargs):
+        """Update fields on a hosts.
+
+        @param host_ids: A list of ids of hosts to update.
+        @param kwargs: A key value dictionary corresponding to column, value
+            in the host database.
+        """
+        self.host_objects.filter(id__in=host_ids).update(**kwargs)
+
+
+    @rdb_utils.return_rdb_host
+    def get_hosts(self, ids):
+        """Get host objects for the given ids.
+
+        @param ids: The ids for which we need host objects.
+
+        @returns: A list of RDBServerHostWrapper objects, ordered by host_id.
+        """
+        return self.host_objects.filter(id__in=ids).order_by('id')
+
+
+    @rdb_utils.return_rdb_host
+    def find_hosts(self, deps, acls):
+        """Finds valid hosts matching deps, acls.
+
+        @param deps: A list of dependencies to match.
+        @param acls: A list of acls, at least one of which must coincide with
+            an acl group the chosen host is in.
+
+        @return: A list of matching hosts available.
+        """
+        hosts_available = self.host_objects.filter(invalid=0)
+        queries = [Q(labels__id=dep) for dep in deps]
+        queries += [Q(aclgroup__id__in=acls)]
+        for query in queries:
+            hosts_available = hosts_available.filter(query)
+        return hosts_available
+
+
+class AvailableHostQueryManager(BaseHostQueryManager):
+    """Query manager for requests on un-leased, un-locked hosts.
+    """
+
+    host_objects = models.Host.leased_objects
+
+
+# Request Handlers: Used in conjunction with requests in rdb_utils, these
+# handlers acquire hosts for a request and record the acquisition in
+# an response_map dictionary keyed on the request itself, with the host/hosts
+# as values.
+class BaseHostRequestHandler(object):
+    """Handler for requests related to hosts, leased or unleased.
+
+    This class is only capable of blindly returning host information.
+    """
+
+    def __init__(self):
+        self.host_query_manager = BaseHostQueryManager()
+        self.response_map = {}
+
+
+    def update_response_map(self, request, response):
+        """Record a response for a request.
+
+        The response_map only contains requests that were either satisfied, or
+        that ran into an exception. Often this translates to reserving hosts
+        against a request. If the rdb hit an exception processing a request, the
+        exception gets recorded in the map for the client to reraise.
+
+        @param response: A response for the request.
+        @param request: The request that has reserved these hosts.
+
+        @raises RDBException: If an empty values is added to the map.
+        """
+        if not response:
+            raise rdb_utils.RDBException('response_map dict can only contain '
+                    'valid responses. Request %s, response %s is invalid.' %
+                     (request, response))
+        if self.response_map.get(request):
+            raise rdb_utils.RDBException('Request %s already has response %s '
+                                         'the rdb cannot return multiple '
+                                         'responses for the same request.' %
+                                         (request, response))
+        self.response_map[request] = response
+
+
+    def _record_exceptions(self, request, exceptions):
+        """Record a list of exceptions for a request.
+
+        @param request: The request for which the exceptions were hit.
+        @param exceptions: The exceptions hit while processing the request.
+        """
+        rdb_exceptions = [rdb_utils.RDBException(ex) for ex in exceptions]
+        self.update_response_map(request, rdb_exceptions)
+
+
+    def get_response(self):
+        """Convert all RDBServerHostWrapper objects to host info dictionaries.
+
+        @return: A dictionary mapping requests to a list of matching host_infos.
+        """
+        for request, response in self.response_map.iteritems():
+            self.response_map[request] = [reply.wire_format()
+                                          for reply in response]
+        return self.response_map
+
+
+    def update_hosts(self, update_requests):
+        """Updates host tables with a payload.
+
+        @param update_requests: A list of update requests, as defined in
+            rdb_utils.UpdateHostRequest.
+        """
+        # Last payload for a host_id wins in the case of conflicting requests.
+        unique_host_requests = {}
+        for request in update_requests:
+            if unique_host_requests.get(request.host_id):
+                unique_host_requests[request.host_id].update(request.payload)
+            else:
+                unique_host_requests[request.host_id] = request.payload
+
+        # Batch similar payloads so we can do them in one table scan.
+        similar_requests = {}
+        for host_id, payload in unique_host_requests.iteritems():
+            similar_requests.setdefault(payload, []).append(host_id)
+
+        # If fields of the update don't match columns in the database,
+        # record the exception in the response map. This also means later
+        # updates will get applied even if previous updates fail.
+        for payload, hosts in similar_requests.iteritems():
+            try:
+                response = self.host_query_manager.update_hosts(hosts, **payload)
+            except (django_exceptions.FieldError,
+                    fields.FieldDoesNotExist) as e:
+                for host in hosts:
+                    # Since update requests have a consistent hash this will map
+                    # to the same key as the original request.
+                    request = rdb_utils.UpdateHostRequest(
+                            host_id=host, payload=payload).get_request()
+                    self._record_exceptions(request, [e])
+
+
+    def batch_get_hosts(self, host_requests):
+        """Get hosts matching the requests.
+
+        This method does not acquire the hosts, i.e it reserves hosts against
+        requests leaving their leased state untouched.
+
+        @param host_requests: A list of requests, as defined in
+            rdb_utils.BaseHostRequest.
+        """
+        host_ids = set([request.host_id for request in host_requests])
+        host_map = {}
+
+        # This list will not contain available hosts if executed using
+        # an AvailableHostQueryManager.
+        for host in self.host_query_manager.get_hosts(host_ids):
+            host_map[host.id] = host
+        for request in host_requests:
+            if request.host_id in host_map:
+                self.update_response_map(request, [host_map[request.host_id]])
+            else:
+                logging.warning('rdb could not get host for request: %s, it '
+                                'is already leased or locked', request)
+
+
+class AvailableHostRequestHandler(BaseHostRequestHandler):
+    """Handler for requests related to available (unleased and unlocked) hosts.
+
+    This class is capable of acquiring or validating hosts for requests.
+    """
+
+
+    def __init__(self):
+        self.host_query_manager = AvailableHostQueryManager()
+        self.response_map = {}
+
+
+    def lease_hosts(self, hosts):
+        """Leases a list hosts.
+
+        @param hosts: A list of hosts to lease.
+        """
+        requests = [rdb_utils.UpdateHostRequest(host_id=host.id,
+                payload={'leased': 1}).get_request() for host in hosts]
+        super(AvailableHostRequestHandler, self).update_hosts(requests)
+
+
+    @_timer.decorate
+    def batch_acquire_hosts(self, host_requests):
+        """Acquire hosts for a list of requests.
+
+        The act of acquisition involves finding and leasing a set of
+        hosts that match the parameters of a request. Each acquired
+        host is added to the response_map dictionary, as an
+        RDBServerHostWrapper.
+
+        @param host_requests: A list of requests to acquire hosts.
+        """
+        batched_host_request = collections.Counter(host_requests)
+        for request, count in batched_host_request.iteritems():
+            hosts = self.host_query_manager.find_hosts(
+                            request.deps, request.acls)
+            num_hosts = min(len(hosts), count)
+            if num_hosts:
+                # TODO(beeps): Only reserve hosts we have successfully leased.
+                self.lease_hosts(hosts[:num_hosts])
+                self.update_response_map(request, hosts[:num_hosts])
+            if num_hosts < count:
+                logging.warning('%s Unsatisfied rdb acquisition request:%s ',
+                                count-num_hosts, request)
+
+
+    @_timer.decorate
+    def batch_validate_hosts(self, requests):
+        """Validate requests with hosts.
+
+        Reserve all hosts, check each one for validity and discard invalid
+        request-host pairings. Lease the remaining hsots.
+
+        @param requests: A list of requests to validate.
+        """
+        # Multiple requests can have the same host (but different acls/deps),
+        # and multiple jobs can submit identical requests (same host_id,
+        # acls, deps). In both these cases the first request to check the host
+        # map wins, though in the second case it doesn't matter.
+        self.batch_get_hosts(set(requests))
+        for request in self.response_map.keys():
+            hosts = self.response_map[request]
+            if len(hosts) > 1:
+                raise rdb_utils.RDBException('Got multiple hosts for a single '
+                        'request. Hosts: %s, request %s.' % (hosts, request))
+            host = hosts[0]
+            if not ((request.acls.intersection(host.acls) or host.invalid) and
+                    request.deps.intersection(host.labels) == request.deps):
+                if request.host_id != host.id:
+                    raise rdb_utils.RDBException('Cannot assign a different '
+                            'host for requset: %s, it already has one: %s ' %
+                            (request, host.id))
+                del self.response_map[request]
+                logging.warning('Failed rdb validation request:%s ', request)
+
+        # TODO(beeps): Update acquired hosts with failed leases.
+        self.lease_hosts([hosts[0] for hosts in self.response_map.values()])
+
+
+# Request dispatchers: Create the appropriate request handler, send a list
+# of requests to one of its methods. The corresponding request handler in
+# rdb_lib must understand how to match each request with a response from a
+# dispatcher, the easiest way to achieve this is to returned the response_map
+# attribute of the request handler, after making the appropriate requests.
+def get_hosts(host_requests):
+    """Get host information about the requested hosts.
+
+    @param host_requests: A list of requests as defined in BaseHostRequest.
+    @return: A dictionary mapping each request to a list of hosts.
+    """
+    rdb_handler = BaseHostRequestHandler()
+    rdb_handler.batch_get_hosts(host_requests)
+    return rdb_handler.get_response()
+
+
+def update_hosts(update_requests):
+    """Update hosts.
+
+    @param update_requests: A list of updates to host tables
+        as defined in UpdateHostRequest.
+    """
+    rdb_handler = BaseHostRequestHandler()
+    rdb_handler.update_hosts(update_requests)
+    return rdb_handler.get_response()
+
+
+def rdb_host_request_dispatcher(host_requests):
+    """Dispatcher for all host acquisition queries.
+
+    @param host_requests: A list of requests for acquiring hosts, as defined in
+        AcquireHostRequest.
+    @return: A dictionary mapping each request to a list of hosts, or
+        an empty list if none could satisfy the request. Eg:
+        {AcquireHostRequest.template: [host_info_dictionaries]}
+    """
+    validation_requests = []
+    require_hosts_requests = []
+
+    # Validation requests are made by a job scheduled against a specific host
+    # specific host (eg: through the frontend) and only require the rdb to
+    # match the parameters of the host against the request. Acquisition
+    # requests are made by jobs that need hosts (eg: suites) and the rdb needs
+    # to find hosts matching the parameters of the request.
+    for request in host_requests:
+        if request.host_id:
+            validation_requests.append(request)
+        else:
+            require_hosts_requests.append(request)
+
+    rdb_handler = AvailableHostRequestHandler()
+    rdb_handler.batch_validate_hosts(validation_requests)
+    rdb_handler.batch_acquire_hosts(require_hosts_requests)
+    return rdb_handler.get_response()
diff --git a/scheduler/rdb_lib.py b/scheduler/rdb_lib.py
new file mode 100644
index 0000000..5d3432e
--- /dev/null
+++ b/scheduler/rdb_lib.py
@@ -0,0 +1,230 @@
+"""Performs translation between monitor_db and the rdb.
+"""
+import logging
+
+import common
+from autotest_lib.scheduler import rdb
+from autotest_lib.scheduler import rdb_utils
+from autotest_lib.server.cros import provision
+
+
+# RDB request managers: Call an rdb api_method with a list of RDBRequests, and
+# match the requests to the responses returned.
+class RDBRequestManager(object):
+    """Base request manager for RDB requests.
+
+    Each instance of a request manager is associated with one request, and
+    one api call. All subclasses maintain a queue of unexecuted requests, and
+    and expose an api to add requests/retrieve the response for these requests.
+    """
+
+
+    def __init__(self, request, api_call):
+        """
+        @param request: A subclass of rdb_utls.RDBRequest. The manager can only
+            manage requests of one type.
+        @param api_call: The rdb api call this manager is expected to make.
+            A manager can only send requests of type request, to this api call.
+        """
+        self.request = request
+        self.api_call = api_call
+        self.request_queue = []
+
+
+    def add_request(self, **kwargs):
+        """Add an RDBRequest to the queue."""
+        self.request_queue.append(self.request(**kwargs).get_request())
+
+
+    def response(self):
+        """Execute the api call and return a response for each request.
+
+        The order of responses is the same as the order of requests added
+        to the queue.
+
+        @yield: A response for each request added to the queue after the
+            last invocation of response.
+        """
+        if not self.request_queue:
+            raise rdb_utils.RDBException('No requests. Call add_requests '
+                    'with the appropriate kwargs, before calling response.')
+
+        result = self.api_call(self.request_queue)
+        requests = self.request_queue
+        self.request_queue = []
+        for request in requests:
+            yield result.get(request) if result else None
+
+
+class BaseHostRequestManager(RDBRequestManager):
+    """Manager for batched get requests on hosts."""
+
+
+    def response(self):
+        """Yields a popped host from the returned host list."""
+
+        # As a side-effect of returning a host, this method also removes it
+        # from the list of hosts matched up against a request. Eg:
+        #    hqes: [hqe1, hqe2, hqe3]
+        #    client requests: [c_r1, c_r2, c_r3]
+        #    generate requests in rdb: [r1 (c_r1 and c_r2), r2]
+        #    and response {r1: [h1, h2], r2:[h3]}
+        # c_r1 and c_r2 need to get different hosts though they're the same
+        # request, because they're from different queue_entries.
+        for hosts in super(BaseHostRequestManager, self).response():
+            yield hosts.pop() if hosts else None
+
+
+# Scheduler host proxy: Convert host information returned by the rdb into
+# a client host object capable of proxying updates back to the rdb.
+class RDBClientHostWrapper(object):
+    """A wrapper for host information.
+
+    This wrapper is used whenever the queue entry needs direct access
+    to the host.
+    """
+
+    required_fields = set(['id', 'hostname', 'platform','labels',
+                           'acls', 'protection', 'dirty', 'status'])
+
+
+    def _update_attributes(self, new_attributes):
+        """Updates attributes based on an input dictionary.
+
+        Since reads are not proxied to the rdb this method caches updates to
+        the host tables as class attributes.
+
+        @param new_attributes: A dictionary of attributes to update.
+        """
+        for name, value in new_attributes.iteritems():
+            setattr(self, name, value)
+
+
+    def __init__(self, **kwargs):
+        if self.required_fields - set(kwargs.keys()):
+            raise rdb_utils.RDBException('Creating %s requires %s, got %s '
+                    % (self.__class__, self.required_fields, kwargs.keys()))
+        self._update_attributes(kwargs)
+        self.update_request_manager = RDBRequestManager(
+                rdb_utils.UpdateHostRequest, rdb.update_hosts)
+        self.dbg_str = ''
+
+
+    def _update(self, payload):
+        """Send an update to rdb, save the attributes of the payload locally.
+
+        @param: A dictionary representing 'key':value of the update required.
+
+        @raises RDBException: If the update fails.
+        """
+        logging.info('Host %s in %s updating %s through rdb on behalf of: %s ',
+                     self.hostname, self.status, payload, self.dbg_str)
+        self.update_request_manager.add_request(host_id=self.id, payload=payload)
+        for response in self.update_request_manager.response():
+            if response:
+                raise rdb_utils.RDBException('Host %s unable to perform update '
+                        '%s through rdb on behalf of %s: %s',  self.hostname,
+                        payload, self.dbg_str, response)
+        self._update_attributes(payload)
+
+
+    def set_status(self, status):
+        """Proxy for setting the status of a host via the rdb.
+
+        @param status: The new status.
+        """
+        self._update({'status': status})
+
+
+    def update_field(self, fieldname, value):
+        """Proxy for updating a field on the host.
+
+        @param fieldname: The fieldname as a string.
+        @param value: The value to assign to the field.
+        """
+        self._update({fieldname: value})
+
+
+    def platform_and_labels(self):
+        """Get the platform and labels on this host.
+
+        @return: A tuple containing a list of label names, and the platform name.
+        """
+        platform = self.platform
+        labels = [label for label in self.labels if label != platform]
+        return platform, labels
+
+
+# Adapters for scheduler specific objects: Convert job information to a
+# format more ameanable to the rdb/rdb request managers.
+class JobQueryManager(object):
+    """A caching query manager for all job related information."""
+    def __init__(self, host_scheduler, queue_entries):
+
+        # TODO(beeps): Break the dependency on the host_scheduler,
+        # crbug.com/336934.
+        self.host_scheduler = host_scheduler
+        jobs = [queue_entry.job_id for queue_entry in queue_entries]
+        self._job_acls = self.host_scheduler._get_job_acl_groups(jobs)
+        self._job_deps = self.host_scheduler._get_job_dependencies(jobs)
+        self._labels = self.host_scheduler._get_labels(self._job_deps)
+
+
+    def get_job_info(self, queue_entry):
+        """Extract job information from a queue_entry/host-scheduler.
+
+        @param queue_entry: The queue_entry for which we need job information.
+
+        @return: A dictionary representing job related information.
+        """
+        job_id = queue_entry.job_id
+        job_deps = self._job_deps.get(job_id, [])
+        job_deps = [dep for dep in job_deps
+                    if not provision.can_provision(self._labels[dep].name)]
+        job_acls = self._job_acls.get(job_id, [])
+
+        return {'deps': job_deps, 'acls': job_acls,
+                'host_id': queue_entry.host_id}
+
+
+def acquire_hosts(host_scheduler, queue_entries):
+    """Acquire hosts for the list of queue_entries.
+
+    @param queue_entries: A list of queue_entries that need hosts.
+    @param host_scheduler: The host_scheduler object, needed to get job
+        information.
+
+    @yield: An RDBClientHostWrapper for each host acquired on behalf of a
+        queue_entry, or None if a host wasn't found.
+
+    @raises RDBException: If something goes wrong making the request.
+    """
+    job_query_manager = JobQueryManager(host_scheduler, queue_entries)
+    request_manager = BaseHostRequestManager(
+            rdb_utils.AcquireHostRequest, rdb.rdb_host_request_dispatcher)
+    for entry in queue_entries:
+        request_manager.add_request(**job_query_manager.get_job_info(entry))
+
+    for host in request_manager.response():
+        yield (RDBClientHostWrapper(**host)
+               if host else None)
+
+
+def get_hosts(host_ids):
+    """Get information about the hosts with ids in host_ids.
+
+    @param host_ids: A list of host_ids.
+
+    @return: A list of RDBClientHostWrapper objects.
+
+    @raises RDBException: If something goes wrong in making the request.
+    """
+    request_manager = BaseHostRequestManager(rdb_utils.HostRequest, rdb.get_hosts)
+    for host_id in host_ids:
+        request_manager.add_request(host_id=host_id)
+
+    hosts = []
+    for host in request_manager.response():
+        hosts.append(RDBClientHostWrapper(**host)
+                     if host else None)
+    return hosts
diff --git a/scheduler/rdb_utils.py b/scheduler/rdb_utils.py
new file mode 100644
index 0000000..37f8723
--- /dev/null
+++ b/scheduler/rdb_utils.py
@@ -0,0 +1,246 @@
+"""RDB utilities.
+
+Do not import rdb or autotest modules here to avoid cyclic dependencies.
+"""
+import itertools
+import collections
+
+
+class RDBException(Exception):
+    """Generic RDB exception."""
+
+    def wire_format(self):
+        """Convert the exception to a format better suited to an rpc response.
+        """
+        return str(self)
+
+
+class RDBRequestMeta(type):
+    """Metaclass for constructing rdb requests.
+
+    This meta class creates a read-only request template by combining the
+    request_arguments of all classes in the inheritence hierarchy into a namedtuple.
+    """
+    def __new__(cls, name, bases, dctn):
+        for base in bases:
+            try:
+                dctn['_request_args'].update(base._request_args)
+            except AttributeError:
+                pass
+        dctn['template'] = collections.namedtuple('template',
+                                                  dctn['_request_args'])
+        return type.__new__(cls, name, bases, dctn)
+
+
+# RDB Request classes: Used in conjunction with the request managers defined in
+# rdb_lib. Each class defines the set of fields the rdb needs to fulfill the
+# request, and a hashable request object the request managers use to identify
+# a response with a request.
+class RDBRequest(object):
+    """Base class for an rdb request.
+
+    All classes inheriting from RDBRequest will need to specify a list of
+    request_args necessary to create the request, and will in turn get a
+    request that the rdb understands.
+    """
+    __metaclass__ = RDBRequestMeta
+    __slots__ = set(['_request_args', '_request'])
+    _request_args = set([])
+
+
+    def __init__(self, **kwargs):
+        for key,value in kwargs.iteritems():
+            try:
+                hash(value)
+            except TypeError as e:
+                raise RDBException('All fields of a %s must be hashable. '
+                                   '%s: %s, %s failed this test.' %
+                                   (self.__class__, key, type(value), value))
+        try:
+            self._request = self.template(**kwargs)
+        except TypeError:
+            raise RDBException('Creating %s requires args %s, got %s ' %
+                    (self.__class__, self.template._fields, kwargs.keys()))
+
+
+    def get_request(self):
+        """Returns a request that the rdb understands.
+
+        @return: A named tuple with all the fields necessary to make a request.
+        """
+        return self._request
+
+
+class HashableDict(dict):
+    """A hashable dictionary.
+
+    This class assumes all values of the input dict are hashable.
+    """
+
+    def __hash__(self):
+        return hash(tuple(sorted(self.items())))
+
+
+class HostRequest(RDBRequest):
+    """Basic request for information about a single host.
+
+    Eg: HostRequest(host_id=x): Will return all information about host x.
+    """
+    _request_args =  set(['host_id'])
+
+
+class UpdateHostRequest(HostRequest):
+    """Defines requests to update hosts.
+
+    Eg:
+        UpdateHostRequest(host_id=x, payload={'afe_hosts_col_name': value}):
+            Will update column afe_hosts_col_name with the given value, for
+            the given host_id.
+
+    @raises RDBException: If the input arguments don't contain the expected
+        fields to make the request, or are of the wrong type.
+    """
+    _request_args = set(['payload'])
+
+
+    def __init__(self, **kwargs):
+        try:
+            kwargs['payload'] = HashableDict(kwargs['payload'])
+        except (KeyError, TypeError) as e:
+            raise RDBException('Creating %s requires args %s, got %s ' %
+                    (self.__class__, self.template._fields, kwargs.keys()))
+        super(UpdateHostRequest, self).__init__(**kwargs)
+
+
+class AcquireHostRequest(HostRequest):
+    """Defines requests to acquire hosts.
+
+    Eg:
+        AcquireHostRequest(host_id=None, deps=[d1, d2], acls=[a1, a2]): Will
+            acquire and return a host that matches the specified deps/acls.
+        AcquireHostRequest(host_id=x, deps=[d1, d2], acls=[a1, a2]) : Will
+            acquire and return host x, after checking deps/acls match.
+
+    @raises RDBException: If the the input arguments don't contain the expected
+        fields to make a request, or are of the wrong type.
+    """
+    _request_args = set(['deps', 'acls'])
+
+
+    def __init__(self, **kwargs):
+        try:
+            kwargs['deps'] = frozenset(kwargs['deps'])
+            kwargs['acls'] = frozenset(kwargs['acls'])
+        except (KeyError, TypeError) as e:
+            raise RDBException('Creating %s requires args %s, got %s ' %
+                    (self.__class__, self.template._fields, kwargs.keys()))
+        super(AcquireHostRequest, self).__init__(**kwargs)
+
+
+# Custom iterators: Used by the rdb to lazily convert the iteration of a
+# queryset to a database query and return an appropriately formatted response.
+class RememberingIterator(object):
+    """An iterator capable of reproducing all values in the input generator.
+    """
+
+    #pylint: disable-msg=C0111
+    def __init__(self, gen):
+        self.current, self.history = itertools.tee(gen)
+        self.items = []
+
+
+    def __iter__(self):
+        return self
+
+
+    def next(self):
+        return self.current.next()
+
+
+    def get_all_items(self):
+        """Get all the items in the generator this object was created with.
+
+        @return: A list of items.
+        """
+        if not self.items:
+            self.items = list(self.history)
+        return self.items
+
+
+class LabelIterator(RememberingIterator):
+    """A RememberingIterator for labels.
+
+    Within the rdb any label/dependency comparisons are performed based on label
+    ids. However, the host object returned needs to contain label names instead.
+    This class returns the label id when iterated over, but a list of all label
+    names when accessed through get_all_items.
+    """
+
+
+    def next(self):
+        return super(LabelIterator, self).next().id
+
+
+    def get_all_items(self):
+        """Get all label names of the labels in the input generator.
+
+        @return: A list of label names.
+        """
+        return [label.name
+                for label in super(LabelIterator, self).get_all_items()]
+
+
+# Rdb host adapters: Help in making a raw database host object more ameanable
+# to the classes and functions in the rdb and/or rdb clients.
+class RDBServerHostWrapper(object):
+    """A host wrapper for the raw database object.
+    """
+
+
+    def __init__(self, host):
+        self.id = host.id
+        self.hostname = host.hostname
+        self.status = host.status
+        self.protection = host.protection
+        self.dirty = host.dirty
+        self.invalid = host.invalid
+        self.labels = LabelIterator(
+                (label for label in host.labels.all()))
+        self.acls = RememberingIterator(
+                (acl.id for acl in host.aclgroup_set.all()))
+        platform = host.platform()
+        self.platform = platform.name if platform else None
+
+
+    def wire_format(self):
+        """Returns all information needed to scheduler jobs on the host.
+
+        @return: A dictionary of host information.
+        """
+        host_info = {}
+        for key, value in self.__dict__.iteritems():
+            if isinstance(value, RememberingIterator):
+                host_info[key] = value.get_all_items()
+            else:
+                host_info[key] = value
+        return host_info
+
+
+def return_rdb_host(func):
+    """Decorator for functions that return a list of Host objects.
+
+    @param func: The decorated function.
+    @return: A functions capable of converting each host_object to a
+        RDBServerHostWrapper.
+    """
+    def get_rdb_host(*args, **kwargs):
+        """Takes a list of hosts and returns a list of host_infos.
+
+        @param hosts: A list of hosts. Each host is assumed to contain
+            all the fields in a host_info defined above.
+        @return: A list of RDBServerHostWrappers, one per host, or an empty
+            list is no hosts were found..
+        """
+        hosts = func(*args, **kwargs)
+        return [RDBServerHostWrapper(host) for host in hosts]
+    return get_rdb_host
diff --git a/scheduler/scheduler_models.py b/scheduler/scheduler_models.py
index 61af052..2211cbd 100644
--- a/scheduler/scheduler_models.py
+++ b/scheduler/scheduler_models.py
@@ -24,6 +24,7 @@
 from autotest_lib.frontend.afe import models, model_attributes
 from autotest_lib.database import database_connection
 from autotest_lib.scheduler import drone_manager, email_manager
+from autotest_lib.scheduler import rdb_lib
 from autotest_lib.scheduler import scheduler_config
 from autotest_lib.site_utils.graphite import stats
 from autotest_lib.client.common_lib import control_data
@@ -361,7 +362,8 @@
 class Host(DBObject):
     _table_name = 'afe_hosts'
     _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
-               'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
+               'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty',
+               'leased')
     _timer = stats.Timer("scheduler_models.Host")
 
 
@@ -448,7 +450,8 @@
         self.job = Job(self.job_id)
 
         if self.host_id:
-            self.host = Host(self.host_id)
+            self.host = rdb_lib.get_hosts([self.host_id])[0]
+            self.host.dbg_str = self.get_dbg_str()
         else:
             self.host = None
 
@@ -537,6 +540,17 @@
         return 'no host'
 
 
+    def get_dbg_str(self):
+        """Get a debug string to identify this host.
+
+        @return: A string containing the hqe and job id.
+        """
+        try:
+            return 'HQE: %s, for job: %s' % (self.id, self.job_id)
+        except AttributeError as e:
+            return 'HQE has not been initialized yet: %s' % e
+
+
     def __str__(self):
         flags = []
         if self.active:
@@ -550,8 +564,9 @@
         flags_str = ','.join(flags)
         if flags_str:
             flags_str = ' [%s]' % flags_str
-        return "%s/%d (%d) %s%s" % (self._get_hostname(), self.job.id, self.id,
-                                    self.status, flags_str)
+        return ("%s and host: %s has status:%s%s" %
+                (self.get_dbg_str(), self._get_hostname(), self.status,
+                 flags_str))
 
 
     @_timer.decorate