[autotest] RDB Refactor II + Request/Response API.

Scheduler Refactor:
1. Batched processing of jobs.
2. Rdb hits the database instead of going through host_scheduler.
3. Migration to add a leased column.The scheduler released hosts
    every tick, back to the rdb.
4. Client rdb host that queue_entries use to track a host, instead
    of a database model.

Establishes a basic request/response api for the rdb:
rdb_utils:
    1. Requests: Assert the format and fields of some basic request types.
    2. Helper client/server modules to communicate with the rdb.
rdb_lib:
    1. Request managers for rdb methods:
        a. Match request-response
        b. Abstract the batching of requests.
    2. JobQueryManager: Regulates database access for job information.
rdb:
    1. QueryManagers: Regulate database access
    2. RequestHandlers: Use query managers to get things done.
    3. Dispatchers: Send incoming requests to the appropriate handlers.
Ignores wire formats.

TEST=unittests, functional verification.
BUG=chromium:314081, chromium:314083, chromium:314084
DEPLOY=scheduler, migrate

Change-Id: Id174c663c6e78295d365142751053eae4023116d
Reviewed-on: https://chromium-review.googlesource.com/183385
Reviewed-by: Prashanth B <beeps@chromium.org>
Commit-Queue: Prashanth B <beeps@chromium.org>
Tested-by: Prashanth B <beeps@chromium.org>
diff --git a/scheduler/rdb.py b/scheduler/rdb.py
index 8dc3bcd..a0c5250 100644
--- a/scheduler/rdb.py
+++ b/scheduler/rdb.py
@@ -1,111 +1,328 @@
+"""Rdb server module.
 """
-Rdb server module.
-"""
+
+import collections
 import logging
+
+import common
+
+from django.core import exceptions as django_exceptions
+from django.db.models import fields
+from django.db.models import Q
+from autotest_lib.frontend.afe import models
+from autotest_lib.scheduler import rdb_utils
 from autotest_lib.site_utils.graphite import stats
 
 
 _timer = stats.Timer('rdb')
 
-def _check_acls(job_acls, host_acls):
-    if job_acls is None or host_acls is None:
-        return False
-    return len(host_acls.intersection(job_acls))
 
-
-def _check_deps(job_deps, host_labels):
-    if job_deps is None or host_labels is None:
-        return False
-    return len(job_deps - host_labels) == 0
-
-
-def validate_host_assignment(job_info, host_info):
-    """ Validate this job<->host pairing.
-
-    @param job_info: Information about the job as determined by
-                     the client rdb module.
-    @param host_info: Information about the host as determined by
-                      get_host_info.
-
-    @return: True if the job<->host pairing is valid, False otherwise.
-             False, if we don't have enough information to make a decision.
-    """
-    one_time_host = host_info.get('invalid') and job_info.get('host_id')
-
-    return (_check_acls(job_info.get('acls'), host_info.get('acls')) and
-            _check_deps(job_info.get('deps'), host_info.get('labels')) and
-            not host_info.get('invalid', True) or one_time_host and
-            not host_info.get('locked', True))
-
-
-def get_host_info(host_scheduler, host_id):
-    """
-    Utility method to parse information about a host into a dictionary.
-
-    Ideally this can just return the Host object, but doing this has the
-    following advantages:
-        1. Changes to the schema will only require changes to this method.
-        2. We can reimplement this method to make use of django caching.
-        3. We can lock rows of the host table in a centralized location.
-
-    @param host_id: id of the host to get information about.
-    @return: A dictionary containing all information needed to make a
-             scheduling decision regarding this host.
-    """
-    acls = host_scheduler._host_acls.get(host_id, set())
-    labels = host_scheduler._host_labels.get(host_id, set())
-    host_info = {'labels': labels, 'acls': acls}
-    host = host_scheduler._hosts_available.get(host_id)
-    if host:
-        host_info.update({'locked': host.locked, 'invalid': host.invalid})
-    return host_info
-
-
-def _order_labels(host_scheduler, labels):
-    """Given a list of labels, order them by available host count.
-
-    To make a scheduling decision, we need a host that matches all dependencies
-    of a job, hence the most restrictive search space we can use is the list
-    of ready hosts that have the least frequent label.
-
-    @param labels: A list of labels. If no hosts are available in a label,
-                   it will be the first in this list.
-    """
-    label_count = [len(host_scheduler._label_hosts.get(label, []))
-                   for label in labels]
-    return [label_tuple[1] for label_tuple in sorted(zip(label_count, labels))]
-
-
-@_timer.decorate
-def get_host(host_scheduler, job_info):
-    """
-    Get a host matching the job's selection criterion.
-
-    - Get all hosts in rarest label.
-    - Check which ones are still usable.
-    - Return the first of these hosts that passes our validity checks.
-
-    @param job_info: A dictionary of job information needed to pick a host.
-
-    @return: A host object from the available_hosts map.
+# Qeury managers: Provide a layer of abstraction over the database by
+# encapsulating common query patterns used by the rdb.
+class BaseHostQueryManager(object):
+    """Base manager for host queries on all hosts.
     """
 
-    # A job must at least have one dependency (eg:'board:') in order for us to
-    # find a host for it. To do so we use 2 data structures of host_scheduler:
-    # - label to hosts map: to count label frequencies, and get hosts in a label
-    # - hosts_available map: to mark a host as used, as it would be difficult
-    #   to delete this host from all the label keys it has, in the label to
-    #   hosts map.
-    rarest_label = _order_labels(host_scheduler, job_info.get('deps'))[0]
+    host_objects = models.Host.objects
 
-    # TODO(beeps): Once we have implemented locking in afe_hosts make this:
-    # afe.models.Host.object.filter(locked).filter(acls).filter(labels)...
-    # where labels are chained according to frequency. Currently this will
-    # require a join across all hqes which could be expensive, and is
-    # unnecessary anyway since we need to move away from this scheduling model.
-    hosts_considered = host_scheduler._label_hosts.get(rarest_label, [])
-    for host_id in hosts_considered:
-        host = host_scheduler._hosts_available.get(host_id)
-        host_info = get_host_info(host_scheduler, host_id)
-        if host and validate_host_assignment(job_info, host_info):
-            return host
+
+    def update_hosts(self, host_ids, **kwargs):
+        """Update fields on a hosts.
+
+        @param host_ids: A list of ids of hosts to update.
+        @param kwargs: A key value dictionary corresponding to column, value
+            in the host database.
+        """
+        self.host_objects.filter(id__in=host_ids).update(**kwargs)
+
+
+    @rdb_utils.return_rdb_host
+    def get_hosts(self, ids):
+        """Get host objects for the given ids.
+
+        @param ids: The ids for which we need host objects.
+
+        @returns: A list of RDBServerHostWrapper objects, ordered by host_id.
+        """
+        return self.host_objects.filter(id__in=ids).order_by('id')
+
+
+    @rdb_utils.return_rdb_host
+    def find_hosts(self, deps, acls):
+        """Finds valid hosts matching deps, acls.
+
+        @param deps: A list of dependencies to match.
+        @param acls: A list of acls, at least one of which must coincide with
+            an acl group the chosen host is in.
+
+        @return: A list of matching hosts available.
+        """
+        hosts_available = self.host_objects.filter(invalid=0)
+        queries = [Q(labels__id=dep) for dep in deps]
+        queries += [Q(aclgroup__id__in=acls)]
+        for query in queries:
+            hosts_available = hosts_available.filter(query)
+        return hosts_available
+
+
+class AvailableHostQueryManager(BaseHostQueryManager):
+    """Query manager for requests on un-leased, un-locked hosts.
+    """
+
+    host_objects = models.Host.leased_objects
+
+
+# Request Handlers: Used in conjunction with requests in rdb_utils, these
+# handlers acquire hosts for a request and record the acquisition in
+# an response_map dictionary keyed on the request itself, with the host/hosts
+# as values.
+class BaseHostRequestHandler(object):
+    """Handler for requests related to hosts, leased or unleased.
+
+    This class is only capable of blindly returning host information.
+    """
+
+    def __init__(self):
+        self.host_query_manager = BaseHostQueryManager()
+        self.response_map = {}
+
+
+    def update_response_map(self, request, response):
+        """Record a response for a request.
+
+        The response_map only contains requests that were either satisfied, or
+        that ran into an exception. Often this translates to reserving hosts
+        against a request. If the rdb hit an exception processing a request, the
+        exception gets recorded in the map for the client to reraise.
+
+        @param response: A response for the request.
+        @param request: The request that has reserved these hosts.
+
+        @raises RDBException: If an empty values is added to the map.
+        """
+        if not response:
+            raise rdb_utils.RDBException('response_map dict can only contain '
+                    'valid responses. Request %s, response %s is invalid.' %
+                     (request, response))
+        if self.response_map.get(request):
+            raise rdb_utils.RDBException('Request %s already has response %s '
+                                         'the rdb cannot return multiple '
+                                         'responses for the same request.' %
+                                         (request, response))
+        self.response_map[request] = response
+
+
+    def _record_exceptions(self, request, exceptions):
+        """Record a list of exceptions for a request.
+
+        @param request: The request for which the exceptions were hit.
+        @param exceptions: The exceptions hit while processing the request.
+        """
+        rdb_exceptions = [rdb_utils.RDBException(ex) for ex in exceptions]
+        self.update_response_map(request, rdb_exceptions)
+
+
+    def get_response(self):
+        """Convert all RDBServerHostWrapper objects to host info dictionaries.
+
+        @return: A dictionary mapping requests to a list of matching host_infos.
+        """
+        for request, response in self.response_map.iteritems():
+            self.response_map[request] = [reply.wire_format()
+                                          for reply in response]
+        return self.response_map
+
+
+    def update_hosts(self, update_requests):
+        """Updates host tables with a payload.
+
+        @param update_requests: A list of update requests, as defined in
+            rdb_utils.UpdateHostRequest.
+        """
+        # Last payload for a host_id wins in the case of conflicting requests.
+        unique_host_requests = {}
+        for request in update_requests:
+            if unique_host_requests.get(request.host_id):
+                unique_host_requests[request.host_id].update(request.payload)
+            else:
+                unique_host_requests[request.host_id] = request.payload
+
+        # Batch similar payloads so we can do them in one table scan.
+        similar_requests = {}
+        for host_id, payload in unique_host_requests.iteritems():
+            similar_requests.setdefault(payload, []).append(host_id)
+
+        # If fields of the update don't match columns in the database,
+        # record the exception in the response map. This also means later
+        # updates will get applied even if previous updates fail.
+        for payload, hosts in similar_requests.iteritems():
+            try:
+                response = self.host_query_manager.update_hosts(hosts, **payload)
+            except (django_exceptions.FieldError,
+                    fields.FieldDoesNotExist) as e:
+                for host in hosts:
+                    # Since update requests have a consistent hash this will map
+                    # to the same key as the original request.
+                    request = rdb_utils.UpdateHostRequest(
+                            host_id=host, payload=payload).get_request()
+                    self._record_exceptions(request, [e])
+
+
+    def batch_get_hosts(self, host_requests):
+        """Get hosts matching the requests.
+
+        This method does not acquire the hosts, i.e it reserves hosts against
+        requests leaving their leased state untouched.
+
+        @param host_requests: A list of requests, as defined in
+            rdb_utils.BaseHostRequest.
+        """
+        host_ids = set([request.host_id for request in host_requests])
+        host_map = {}
+
+        # This list will not contain available hosts if executed using
+        # an AvailableHostQueryManager.
+        for host in self.host_query_manager.get_hosts(host_ids):
+            host_map[host.id] = host
+        for request in host_requests:
+            if request.host_id in host_map:
+                self.update_response_map(request, [host_map[request.host_id]])
+            else:
+                logging.warning('rdb could not get host for request: %s, it '
+                                'is already leased or locked', request)
+
+
+class AvailableHostRequestHandler(BaseHostRequestHandler):
+    """Handler for requests related to available (unleased and unlocked) hosts.
+
+    This class is capable of acquiring or validating hosts for requests.
+    """
+
+
+    def __init__(self):
+        self.host_query_manager = AvailableHostQueryManager()
+        self.response_map = {}
+
+
+    def lease_hosts(self, hosts):
+        """Leases a list hosts.
+
+        @param hosts: A list of hosts to lease.
+        """
+        requests = [rdb_utils.UpdateHostRequest(host_id=host.id,
+                payload={'leased': 1}).get_request() for host in hosts]
+        super(AvailableHostRequestHandler, self).update_hosts(requests)
+
+
+    @_timer.decorate
+    def batch_acquire_hosts(self, host_requests):
+        """Acquire hosts for a list of requests.
+
+        The act of acquisition involves finding and leasing a set of
+        hosts that match the parameters of a request. Each acquired
+        host is added to the response_map dictionary, as an
+        RDBServerHostWrapper.
+
+        @param host_requests: A list of requests to acquire hosts.
+        """
+        batched_host_request = collections.Counter(host_requests)
+        for request, count in batched_host_request.iteritems():
+            hosts = self.host_query_manager.find_hosts(
+                            request.deps, request.acls)
+            num_hosts = min(len(hosts), count)
+            if num_hosts:
+                # TODO(beeps): Only reserve hosts we have successfully leased.
+                self.lease_hosts(hosts[:num_hosts])
+                self.update_response_map(request, hosts[:num_hosts])
+            if num_hosts < count:
+                logging.warning('%s Unsatisfied rdb acquisition request:%s ',
+                                count-num_hosts, request)
+
+
+    @_timer.decorate
+    def batch_validate_hosts(self, requests):
+        """Validate requests with hosts.
+
+        Reserve all hosts, check each one for validity and discard invalid
+        request-host pairings. Lease the remaining hsots.
+
+        @param requests: A list of requests to validate.
+        """
+        # Multiple requests can have the same host (but different acls/deps),
+        # and multiple jobs can submit identical requests (same host_id,
+        # acls, deps). In both these cases the first request to check the host
+        # map wins, though in the second case it doesn't matter.
+        self.batch_get_hosts(set(requests))
+        for request in self.response_map.keys():
+            hosts = self.response_map[request]
+            if len(hosts) > 1:
+                raise rdb_utils.RDBException('Got multiple hosts for a single '
+                        'request. Hosts: %s, request %s.' % (hosts, request))
+            host = hosts[0]
+            if not ((request.acls.intersection(host.acls) or host.invalid) and
+                    request.deps.intersection(host.labels) == request.deps):
+                if request.host_id != host.id:
+                    raise rdb_utils.RDBException('Cannot assign a different '
+                            'host for requset: %s, it already has one: %s ' %
+                            (request, host.id))
+                del self.response_map[request]
+                logging.warning('Failed rdb validation request:%s ', request)
+
+        # TODO(beeps): Update acquired hosts with failed leases.
+        self.lease_hosts([hosts[0] for hosts in self.response_map.values()])
+
+
+# Request dispatchers: Create the appropriate request handler, send a list
+# of requests to one of its methods. The corresponding request handler in
+# rdb_lib must understand how to match each request with a response from a
+# dispatcher, the easiest way to achieve this is to returned the response_map
+# attribute of the request handler, after making the appropriate requests.
+def get_hosts(host_requests):
+    """Get host information about the requested hosts.
+
+    @param host_requests: A list of requests as defined in BaseHostRequest.
+    @return: A dictionary mapping each request to a list of hosts.
+    """
+    rdb_handler = BaseHostRequestHandler()
+    rdb_handler.batch_get_hosts(host_requests)
+    return rdb_handler.get_response()
+
+
+def update_hosts(update_requests):
+    """Update hosts.
+
+    @param update_requests: A list of updates to host tables
+        as defined in UpdateHostRequest.
+    """
+    rdb_handler = BaseHostRequestHandler()
+    rdb_handler.update_hosts(update_requests)
+    return rdb_handler.get_response()
+
+
+def rdb_host_request_dispatcher(host_requests):
+    """Dispatcher for all host acquisition queries.
+
+    @param host_requests: A list of requests for acquiring hosts, as defined in
+        AcquireHostRequest.
+    @return: A dictionary mapping each request to a list of hosts, or
+        an empty list if none could satisfy the request. Eg:
+        {AcquireHostRequest.template: [host_info_dictionaries]}
+    """
+    validation_requests = []
+    require_hosts_requests = []
+
+    # Validation requests are made by a job scheduled against a specific host
+    # specific host (eg: through the frontend) and only require the rdb to
+    # match the parameters of the host against the request. Acquisition
+    # requests are made by jobs that need hosts (eg: suites) and the rdb needs
+    # to find hosts matching the parameters of the request.
+    for request in host_requests:
+        if request.host_id:
+            validation_requests.append(request)
+        else:
+            require_hosts_requests.append(request)
+
+    rdb_handler = AvailableHostRequestHandler()
+    rdb_handler.batch_validate_hosts(validation_requests)
+    rdb_handler.batch_acquire_hosts(require_hosts_requests)
+    return rdb_handler.get_response()