[autotest] RDB Refactor II + Request/Response API.
Scheduler Refactor:
1. Batched processing of jobs.
2. Rdb hits the database instead of going through host_scheduler.
3. Migration to add a leased column.The scheduler released hosts
every tick, back to the rdb.
4. Client rdb host that queue_entries use to track a host, instead
of a database model.
Establishes a basic request/response api for the rdb:
rdb_utils:
1. Requests: Assert the format and fields of some basic request types.
2. Helper client/server modules to communicate with the rdb.
rdb_lib:
1. Request managers for rdb methods:
a. Match request-response
b. Abstract the batching of requests.
2. JobQueryManager: Regulates database access for job information.
rdb:
1. QueryManagers: Regulate database access
2. RequestHandlers: Use query managers to get things done.
3. Dispatchers: Send incoming requests to the appropriate handlers.
Ignores wire formats.
TEST=unittests, functional verification.
BUG=chromium:314081, chromium:314083, chromium:314084
DEPLOY=scheduler, migrate
Change-Id: Id174c663c6e78295d365142751053eae4023116d
Reviewed-on: https://chromium-review.googlesource.com/183385
Reviewed-by: Prashanth B <beeps@chromium.org>
Commit-Queue: Prashanth B <beeps@chromium.org>
Tested-by: Prashanth B <beeps@chromium.org>
diff --git a/scheduler/rdb.py b/scheduler/rdb.py
index 8dc3bcd..a0c5250 100644
--- a/scheduler/rdb.py
+++ b/scheduler/rdb.py
@@ -1,111 +1,328 @@
+"""Rdb server module.
"""
-Rdb server module.
-"""
+
+import collections
import logging
+
+import common
+
+from django.core import exceptions as django_exceptions
+from django.db.models import fields
+from django.db.models import Q
+from autotest_lib.frontend.afe import models
+from autotest_lib.scheduler import rdb_utils
from autotest_lib.site_utils.graphite import stats
_timer = stats.Timer('rdb')
-def _check_acls(job_acls, host_acls):
- if job_acls is None or host_acls is None:
- return False
- return len(host_acls.intersection(job_acls))
-
-def _check_deps(job_deps, host_labels):
- if job_deps is None or host_labels is None:
- return False
- return len(job_deps - host_labels) == 0
-
-
-def validate_host_assignment(job_info, host_info):
- """ Validate this job<->host pairing.
-
- @param job_info: Information about the job as determined by
- the client rdb module.
- @param host_info: Information about the host as determined by
- get_host_info.
-
- @return: True if the job<->host pairing is valid, False otherwise.
- False, if we don't have enough information to make a decision.
- """
- one_time_host = host_info.get('invalid') and job_info.get('host_id')
-
- return (_check_acls(job_info.get('acls'), host_info.get('acls')) and
- _check_deps(job_info.get('deps'), host_info.get('labels')) and
- not host_info.get('invalid', True) or one_time_host and
- not host_info.get('locked', True))
-
-
-def get_host_info(host_scheduler, host_id):
- """
- Utility method to parse information about a host into a dictionary.
-
- Ideally this can just return the Host object, but doing this has the
- following advantages:
- 1. Changes to the schema will only require changes to this method.
- 2. We can reimplement this method to make use of django caching.
- 3. We can lock rows of the host table in a centralized location.
-
- @param host_id: id of the host to get information about.
- @return: A dictionary containing all information needed to make a
- scheduling decision regarding this host.
- """
- acls = host_scheduler._host_acls.get(host_id, set())
- labels = host_scheduler._host_labels.get(host_id, set())
- host_info = {'labels': labels, 'acls': acls}
- host = host_scheduler._hosts_available.get(host_id)
- if host:
- host_info.update({'locked': host.locked, 'invalid': host.invalid})
- return host_info
-
-
-def _order_labels(host_scheduler, labels):
- """Given a list of labels, order them by available host count.
-
- To make a scheduling decision, we need a host that matches all dependencies
- of a job, hence the most restrictive search space we can use is the list
- of ready hosts that have the least frequent label.
-
- @param labels: A list of labels. If no hosts are available in a label,
- it will be the first in this list.
- """
- label_count = [len(host_scheduler._label_hosts.get(label, []))
- for label in labels]
- return [label_tuple[1] for label_tuple in sorted(zip(label_count, labels))]
-
-
-@_timer.decorate
-def get_host(host_scheduler, job_info):
- """
- Get a host matching the job's selection criterion.
-
- - Get all hosts in rarest label.
- - Check which ones are still usable.
- - Return the first of these hosts that passes our validity checks.
-
- @param job_info: A dictionary of job information needed to pick a host.
-
- @return: A host object from the available_hosts map.
+# Qeury managers: Provide a layer of abstraction over the database by
+# encapsulating common query patterns used by the rdb.
+class BaseHostQueryManager(object):
+ """Base manager for host queries on all hosts.
"""
- # A job must at least have one dependency (eg:'board:') in order for us to
- # find a host for it. To do so we use 2 data structures of host_scheduler:
- # - label to hosts map: to count label frequencies, and get hosts in a label
- # - hosts_available map: to mark a host as used, as it would be difficult
- # to delete this host from all the label keys it has, in the label to
- # hosts map.
- rarest_label = _order_labels(host_scheduler, job_info.get('deps'))[0]
+ host_objects = models.Host.objects
- # TODO(beeps): Once we have implemented locking in afe_hosts make this:
- # afe.models.Host.object.filter(locked).filter(acls).filter(labels)...
- # where labels are chained according to frequency. Currently this will
- # require a join across all hqes which could be expensive, and is
- # unnecessary anyway since we need to move away from this scheduling model.
- hosts_considered = host_scheduler._label_hosts.get(rarest_label, [])
- for host_id in hosts_considered:
- host = host_scheduler._hosts_available.get(host_id)
- host_info = get_host_info(host_scheduler, host_id)
- if host and validate_host_assignment(job_info, host_info):
- return host
+
+ def update_hosts(self, host_ids, **kwargs):
+ """Update fields on a hosts.
+
+ @param host_ids: A list of ids of hosts to update.
+ @param kwargs: A key value dictionary corresponding to column, value
+ in the host database.
+ """
+ self.host_objects.filter(id__in=host_ids).update(**kwargs)
+
+
+ @rdb_utils.return_rdb_host
+ def get_hosts(self, ids):
+ """Get host objects for the given ids.
+
+ @param ids: The ids for which we need host objects.
+
+ @returns: A list of RDBServerHostWrapper objects, ordered by host_id.
+ """
+ return self.host_objects.filter(id__in=ids).order_by('id')
+
+
+ @rdb_utils.return_rdb_host
+ def find_hosts(self, deps, acls):
+ """Finds valid hosts matching deps, acls.
+
+ @param deps: A list of dependencies to match.
+ @param acls: A list of acls, at least one of which must coincide with
+ an acl group the chosen host is in.
+
+ @return: A list of matching hosts available.
+ """
+ hosts_available = self.host_objects.filter(invalid=0)
+ queries = [Q(labels__id=dep) for dep in deps]
+ queries += [Q(aclgroup__id__in=acls)]
+ for query in queries:
+ hosts_available = hosts_available.filter(query)
+ return hosts_available
+
+
+class AvailableHostQueryManager(BaseHostQueryManager):
+ """Query manager for requests on un-leased, un-locked hosts.
+ """
+
+ host_objects = models.Host.leased_objects
+
+
+# Request Handlers: Used in conjunction with requests in rdb_utils, these
+# handlers acquire hosts for a request and record the acquisition in
+# an response_map dictionary keyed on the request itself, with the host/hosts
+# as values.
+class BaseHostRequestHandler(object):
+ """Handler for requests related to hosts, leased or unleased.
+
+ This class is only capable of blindly returning host information.
+ """
+
+ def __init__(self):
+ self.host_query_manager = BaseHostQueryManager()
+ self.response_map = {}
+
+
+ def update_response_map(self, request, response):
+ """Record a response for a request.
+
+ The response_map only contains requests that were either satisfied, or
+ that ran into an exception. Often this translates to reserving hosts
+ against a request. If the rdb hit an exception processing a request, the
+ exception gets recorded in the map for the client to reraise.
+
+ @param response: A response for the request.
+ @param request: The request that has reserved these hosts.
+
+ @raises RDBException: If an empty values is added to the map.
+ """
+ if not response:
+ raise rdb_utils.RDBException('response_map dict can only contain '
+ 'valid responses. Request %s, response %s is invalid.' %
+ (request, response))
+ if self.response_map.get(request):
+ raise rdb_utils.RDBException('Request %s already has response %s '
+ 'the rdb cannot return multiple '
+ 'responses for the same request.' %
+ (request, response))
+ self.response_map[request] = response
+
+
+ def _record_exceptions(self, request, exceptions):
+ """Record a list of exceptions for a request.
+
+ @param request: The request for which the exceptions were hit.
+ @param exceptions: The exceptions hit while processing the request.
+ """
+ rdb_exceptions = [rdb_utils.RDBException(ex) for ex in exceptions]
+ self.update_response_map(request, rdb_exceptions)
+
+
+ def get_response(self):
+ """Convert all RDBServerHostWrapper objects to host info dictionaries.
+
+ @return: A dictionary mapping requests to a list of matching host_infos.
+ """
+ for request, response in self.response_map.iteritems():
+ self.response_map[request] = [reply.wire_format()
+ for reply in response]
+ return self.response_map
+
+
+ def update_hosts(self, update_requests):
+ """Updates host tables with a payload.
+
+ @param update_requests: A list of update requests, as defined in
+ rdb_utils.UpdateHostRequest.
+ """
+ # Last payload for a host_id wins in the case of conflicting requests.
+ unique_host_requests = {}
+ for request in update_requests:
+ if unique_host_requests.get(request.host_id):
+ unique_host_requests[request.host_id].update(request.payload)
+ else:
+ unique_host_requests[request.host_id] = request.payload
+
+ # Batch similar payloads so we can do them in one table scan.
+ similar_requests = {}
+ for host_id, payload in unique_host_requests.iteritems():
+ similar_requests.setdefault(payload, []).append(host_id)
+
+ # If fields of the update don't match columns in the database,
+ # record the exception in the response map. This also means later
+ # updates will get applied even if previous updates fail.
+ for payload, hosts in similar_requests.iteritems():
+ try:
+ response = self.host_query_manager.update_hosts(hosts, **payload)
+ except (django_exceptions.FieldError,
+ fields.FieldDoesNotExist) as e:
+ for host in hosts:
+ # Since update requests have a consistent hash this will map
+ # to the same key as the original request.
+ request = rdb_utils.UpdateHostRequest(
+ host_id=host, payload=payload).get_request()
+ self._record_exceptions(request, [e])
+
+
+ def batch_get_hosts(self, host_requests):
+ """Get hosts matching the requests.
+
+ This method does not acquire the hosts, i.e it reserves hosts against
+ requests leaving their leased state untouched.
+
+ @param host_requests: A list of requests, as defined in
+ rdb_utils.BaseHostRequest.
+ """
+ host_ids = set([request.host_id for request in host_requests])
+ host_map = {}
+
+ # This list will not contain available hosts if executed using
+ # an AvailableHostQueryManager.
+ for host in self.host_query_manager.get_hosts(host_ids):
+ host_map[host.id] = host
+ for request in host_requests:
+ if request.host_id in host_map:
+ self.update_response_map(request, [host_map[request.host_id]])
+ else:
+ logging.warning('rdb could not get host for request: %s, it '
+ 'is already leased or locked', request)
+
+
+class AvailableHostRequestHandler(BaseHostRequestHandler):
+ """Handler for requests related to available (unleased and unlocked) hosts.
+
+ This class is capable of acquiring or validating hosts for requests.
+ """
+
+
+ def __init__(self):
+ self.host_query_manager = AvailableHostQueryManager()
+ self.response_map = {}
+
+
+ def lease_hosts(self, hosts):
+ """Leases a list hosts.
+
+ @param hosts: A list of hosts to lease.
+ """
+ requests = [rdb_utils.UpdateHostRequest(host_id=host.id,
+ payload={'leased': 1}).get_request() for host in hosts]
+ super(AvailableHostRequestHandler, self).update_hosts(requests)
+
+
+ @_timer.decorate
+ def batch_acquire_hosts(self, host_requests):
+ """Acquire hosts for a list of requests.
+
+ The act of acquisition involves finding and leasing a set of
+ hosts that match the parameters of a request. Each acquired
+ host is added to the response_map dictionary, as an
+ RDBServerHostWrapper.
+
+ @param host_requests: A list of requests to acquire hosts.
+ """
+ batched_host_request = collections.Counter(host_requests)
+ for request, count in batched_host_request.iteritems():
+ hosts = self.host_query_manager.find_hosts(
+ request.deps, request.acls)
+ num_hosts = min(len(hosts), count)
+ if num_hosts:
+ # TODO(beeps): Only reserve hosts we have successfully leased.
+ self.lease_hosts(hosts[:num_hosts])
+ self.update_response_map(request, hosts[:num_hosts])
+ if num_hosts < count:
+ logging.warning('%s Unsatisfied rdb acquisition request:%s ',
+ count-num_hosts, request)
+
+
+ @_timer.decorate
+ def batch_validate_hosts(self, requests):
+ """Validate requests with hosts.
+
+ Reserve all hosts, check each one for validity and discard invalid
+ request-host pairings. Lease the remaining hsots.
+
+ @param requests: A list of requests to validate.
+ """
+ # Multiple requests can have the same host (but different acls/deps),
+ # and multiple jobs can submit identical requests (same host_id,
+ # acls, deps). In both these cases the first request to check the host
+ # map wins, though in the second case it doesn't matter.
+ self.batch_get_hosts(set(requests))
+ for request in self.response_map.keys():
+ hosts = self.response_map[request]
+ if len(hosts) > 1:
+ raise rdb_utils.RDBException('Got multiple hosts for a single '
+ 'request. Hosts: %s, request %s.' % (hosts, request))
+ host = hosts[0]
+ if not ((request.acls.intersection(host.acls) or host.invalid) and
+ request.deps.intersection(host.labels) == request.deps):
+ if request.host_id != host.id:
+ raise rdb_utils.RDBException('Cannot assign a different '
+ 'host for requset: %s, it already has one: %s ' %
+ (request, host.id))
+ del self.response_map[request]
+ logging.warning('Failed rdb validation request:%s ', request)
+
+ # TODO(beeps): Update acquired hosts with failed leases.
+ self.lease_hosts([hosts[0] for hosts in self.response_map.values()])
+
+
+# Request dispatchers: Create the appropriate request handler, send a list
+# of requests to one of its methods. The corresponding request handler in
+# rdb_lib must understand how to match each request with a response from a
+# dispatcher, the easiest way to achieve this is to returned the response_map
+# attribute of the request handler, after making the appropriate requests.
+def get_hosts(host_requests):
+ """Get host information about the requested hosts.
+
+ @param host_requests: A list of requests as defined in BaseHostRequest.
+ @return: A dictionary mapping each request to a list of hosts.
+ """
+ rdb_handler = BaseHostRequestHandler()
+ rdb_handler.batch_get_hosts(host_requests)
+ return rdb_handler.get_response()
+
+
+def update_hosts(update_requests):
+ """Update hosts.
+
+ @param update_requests: A list of updates to host tables
+ as defined in UpdateHostRequest.
+ """
+ rdb_handler = BaseHostRequestHandler()
+ rdb_handler.update_hosts(update_requests)
+ return rdb_handler.get_response()
+
+
+def rdb_host_request_dispatcher(host_requests):
+ """Dispatcher for all host acquisition queries.
+
+ @param host_requests: A list of requests for acquiring hosts, as defined in
+ AcquireHostRequest.
+ @return: A dictionary mapping each request to a list of hosts, or
+ an empty list if none could satisfy the request. Eg:
+ {AcquireHostRequest.template: [host_info_dictionaries]}
+ """
+ validation_requests = []
+ require_hosts_requests = []
+
+ # Validation requests are made by a job scheduled against a specific host
+ # specific host (eg: through the frontend) and only require the rdb to
+ # match the parameters of the host against the request. Acquisition
+ # requests are made by jobs that need hosts (eg: suites) and the rdb needs
+ # to find hosts matching the parameters of the request.
+ for request in host_requests:
+ if request.host_id:
+ validation_requests.append(request)
+ else:
+ require_hosts_requests.append(request)
+
+ rdb_handler = AvailableHostRequestHandler()
+ rdb_handler.batch_validate_hosts(validation_requests)
+ rdb_handler.batch_acquire_hosts(require_hosts_requests)
+ return rdb_handler.get_response()