| # Copyright (c) 2014 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Rdb server module. |
| """ |
| |
| import collections |
| import logging |
| |
| import common |
| |
| from django.core import exceptions as django_exceptions |
| from django.db.models import fields |
| from django.db.models import Q |
| from autotest_lib.frontend.afe import models |
| from autotest_lib.scheduler import rdb_hosts |
| from autotest_lib.scheduler import rdb_requests |
| from autotest_lib.scheduler import rdb_utils |
| from autotest_lib.site_utils.graphite import stats |
| |
| |
| _timer = stats.Timer('rdb') |
| |
| |
| # Qeury managers: Provide a layer of abstraction over the database by |
| # encapsulating common query patterns used by the rdb. |
| class BaseHostQueryManager(object): |
| """Base manager for host queries on all hosts. |
| """ |
| |
| host_objects = models.Host.objects |
| |
| |
| def update_hosts(self, host_ids, **kwargs): |
| """Update fields on a hosts. |
| |
| @param host_ids: A list of ids of hosts to update. |
| @param kwargs: A key value dictionary corresponding to column, value |
| in the host database. |
| """ |
| self.host_objects.filter(id__in=host_ids).update(**kwargs) |
| |
| |
| @rdb_hosts.return_rdb_host |
| def get_hosts(self, ids): |
| """Get host objects for the given ids. |
| |
| @param ids: The ids for which we need host objects. |
| |
| @returns: A list of RDBServerHostWrapper objects, ordered by host_id. |
| """ |
| return self.host_objects.filter(id__in=ids).order_by('id') |
| |
| |
| @rdb_hosts.return_rdb_host |
| def find_hosts(self, deps, acls): |
| """Finds valid hosts matching deps, acls. |
| |
| @param deps: A list of dependencies to match. |
| @param acls: A list of acls, at least one of which must coincide with |
| an acl group the chosen host is in. |
| |
| @return: A set of matching hosts available. |
| """ |
| hosts_available = self.host_objects.filter(invalid=0) |
| queries = [Q(labels__id=dep) for dep in deps] |
| queries += [Q(aclgroup__id__in=acls)] |
| for query in queries: |
| hosts_available = hosts_available.filter(query) |
| return set(hosts_available) |
| |
| |
| class AvailableHostQueryManager(BaseHostQueryManager): |
| """Query manager for requests on un-leased, un-locked hosts. |
| """ |
| |
| host_objects = models.Host.leased_objects |
| |
| |
| # Request Handlers: Used in conjunction with requests in rdb_utils, these |
| # handlers acquire hosts for a request and record the acquisition in |
| # an response_map dictionary keyed on the request itself, with the host/hosts |
| # as values. |
| class BaseHostRequestHandler(object): |
| """Handler for requests related to hosts, leased or unleased. |
| |
| This class is only capable of blindly returning host information. |
| """ |
| |
| def __init__(self): |
| self.host_query_manager = BaseHostQueryManager() |
| self.response_map = {} |
| |
| |
| def update_response_map(self, request, response): |
| """Record a response for a request. |
| |
| The response_map only contains requests that were either satisfied, or |
| that ran into an exception. Often this translates to reserving hosts |
| against a request. If the rdb hit an exception processing a request, the |
| exception gets recorded in the map for the client to reraise. |
| |
| @param response: A response for the request. |
| @param request: The request that has reserved these hosts. |
| |
| @raises RDBException: If an empty values is added to the map. |
| """ |
| if not response: |
| raise rdb_utils.RDBException('response_map dict can only contain ' |
| 'valid responses. Request %s, response %s is invalid.' % |
| (request, response)) |
| if self.response_map.get(request): |
| raise rdb_utils.RDBException('Request %s already has response %s ' |
| 'the rdb cannot return multiple ' |
| 'responses for the same request.' % |
| (request, response)) |
| self.response_map[request] = response |
| |
| |
| def _record_exceptions(self, request, exceptions): |
| """Record a list of exceptions for a request. |
| |
| @param request: The request for which the exceptions were hit. |
| @param exceptions: The exceptions hit while processing the request. |
| """ |
| rdb_exceptions = [rdb_utils.RDBException(ex) for ex in exceptions] |
| self.update_response_map(request, rdb_exceptions) |
| |
| |
| def get_response(self): |
| """Convert all RDBServerHostWrapper objects to host info dictionaries. |
| |
| @return: A dictionary mapping requests to a list of matching host_infos. |
| """ |
| for request, response in self.response_map.iteritems(): |
| self.response_map[request] = [reply.wire_format() |
| for reply in response] |
| return self.response_map |
| |
| |
| def update_hosts(self, update_requests): |
| """Updates host tables with a payload. |
| |
| @param update_requests: A list of update requests, as defined in |
| rdb_requests.UpdateHostRequest. |
| """ |
| # Last payload for a host_id wins in the case of conflicting requests. |
| unique_host_requests = {} |
| for request in update_requests: |
| if unique_host_requests.get(request.host_id): |
| unique_host_requests[request.host_id].update(request.payload) |
| else: |
| unique_host_requests[request.host_id] = request.payload |
| |
| # Batch similar payloads so we can do them in one table scan. |
| similar_requests = {} |
| for host_id, payload in unique_host_requests.iteritems(): |
| similar_requests.setdefault(payload, []).append(host_id) |
| |
| # If fields of the update don't match columns in the database, |
| # record the exception in the response map. This also means later |
| # updates will get applied even if previous updates fail. |
| for payload, hosts in similar_requests.iteritems(): |
| try: |
| response = self.host_query_manager.update_hosts(hosts, **payload) |
| except (django_exceptions.FieldError, |
| fields.FieldDoesNotExist, ValueError) as e: |
| for host in hosts: |
| # Since update requests have a consistent hash this will map |
| # to the same key as the original request. |
| request = rdb_requests.UpdateHostRequest( |
| host_id=host, payload=payload).get_request() |
| self._record_exceptions(request, [e]) |
| |
| |
| def batch_get_hosts(self, host_requests): |
| """Get hosts matching the requests. |
| |
| This method does not acquire the hosts, i.e it reserves hosts against |
| requests leaving their leased state untouched. |
| |
| @param host_requests: A list of requests, as defined in |
| rdb_utils.BaseHostRequest. |
| """ |
| host_ids = set([request.host_id for request in host_requests]) |
| host_map = {} |
| |
| # This list will not contain available hosts if executed using |
| # an AvailableHostQueryManager. |
| for host in self.host_query_manager.get_hosts(host_ids): |
| host_map[host.id] = host |
| for request in host_requests: |
| if request.host_id in host_map: |
| self.update_response_map(request, [host_map[request.host_id]]) |
| else: |
| logging.warning('rdb could not get host for request: %s, it ' |
| 'is already leased or locked', request) |
| |
| |
| class AvailableHostRequestHandler(BaseHostRequestHandler): |
| """Handler for requests related to available (unleased and unlocked) hosts. |
| |
| This class is capable of acquiring or validating hosts for requests. |
| """ |
| |
| |
| def __init__(self): |
| self.host_query_manager = AvailableHostQueryManager() |
| self.response_map = {} |
| |
| |
| def lease_hosts(self, hosts): |
| """Leases a list of hosts. |
| |
| @param hosts: A list of RDBServerHostWrapper instances to lease. |
| |
| @return: The list of RDBServerHostWrappers that were successfully |
| leased. |
| """ |
| #TODO(beeps): crbug.com/353183. |
| unleased_hosts = set(hosts) |
| leased_hosts = set([]) |
| for host in unleased_hosts: |
| try: |
| host.lease() |
| except rdb_utils.RDBException as e: |
| logging.error('Unable to lease host %s: %s', host.hostname, e) |
| else: |
| leased_hosts.add(host) |
| return list(leased_hosts) |
| |
| |
| @classmethod |
| def valid_host_assignment(cls, request, host): |
| """Check if a host, request pairing is valid. |
| |
| @param request: The request to match against the host. |
| @param host: An RDBServerHostWrapper instance. |
| |
| @return: True if the host, request assignment is valid. |
| |
| @raises RDBException: If the request already has another host_ids |
| associated with it. |
| """ |
| if request.host_id and request.host_id != host.id: |
| raise rdb_utils.RDBException( |
| 'Cannot assign a different host for request: %s, it ' |
| 'already has one: %s ' % (request, host.id)) |
| |
| # Getting all labels and acls might result in large queries, so |
| # bail early if the host is already leased. |
| if host.leased: |
| return False |
| # If a host is invalid it must be a one time host added to the |
| # afe specifically for this purpose, so it doesn't require acl checking. |
| acl_match = (request.acls.intersection(host.acls) or host.invalid) |
| label_match = (request.deps.intersection(host.labels) == request.deps) |
| return acl_match and label_match |
| |
| |
| @_timer.decorate |
| def batch_acquire_hosts(self, host_requests): |
| """Acquire hosts for a list of requests. |
| |
| The act of acquisition involves finding and leasing a set of |
| hosts that match the parameters of a request. Each acquired |
| host is added to the response_map dictionary, as an |
| RDBServerHostWrapper. |
| |
| @param host_requests: A list of requests to acquire hosts. |
| """ |
| # Group similar requests and sort by priority, so we don't invert |
| # priorities and lease hosts based on demand alone. |
| batched_host_request = sorted( |
| collections.Counter(host_requests).items(), |
| key=lambda request: request[0].priority, reverse=True) |
| |
| for request, count in batched_host_request: |
| hosts = self.host_query_manager.find_hosts( |
| request.deps, request.acls) |
| num_hosts = min(len(hosts), count) |
| if num_hosts: |
| # Try leasing num hosts, but only update the response map with |
| # those that we could successfully lease. |
| leased_hosts = self.lease_hosts(hosts[:num_hosts]) |
| self.update_response_map(request, leased_hosts) |
| num_hosts = len(leased_hosts) |
| if num_hosts < count: |
| logging.warning('%s Unsatisfied rdb acquisition request:%s ', |
| count-num_hosts, request) |
| |
| |
| @_timer.decorate |
| def batch_validate_hosts(self, requests): |
| """Validate requests with hosts. |
| |
| Reserve all hosts, check each one for validity and discard invalid |
| request-host pairings. Lease the remaining hsots. |
| |
| @param requests: A list of requests to validate. |
| |
| @raises RDBException: If multiple hosts or the wrong host is returned |
| for a response. |
| """ |
| # The following cases are possible for frontend requests: |
| # 1. Multiple requests for 1 host, with different acls/deps/priority: |
| # These form distinct requests because they hash differently. |
| # The response map will contain entries like: {r1: h1, r2: h1} |
| # after the batch_get_hosts call. There are 2 sub-cases: |
| # a. Same deps/acls, different priority: |
| # Since we sort the requests based on priority, the |
| # higher priority request r1, will lease h1. The |
| # validation of r2, h1 will fail because of the r1 lease. |
| # b. Different deps/acls, only one of which matches the host: |
| # The matching request will lease h1. The other host |
| # pairing will get dropped from the response map. |
| # 2. Multiple requests with the same acls/deps/priority and 1 host: |
| # These all have the same request hash, so the response map will |
| # contain: {r: h}, regardless of the number of r's. If this is not |
| # a valid host assignment it will get dropped from the response. |
| self.batch_get_hosts(set(requests)) |
| for request in sorted(self.response_map.keys(), |
| key=lambda request: request.priority, reverse=True): |
| hosts = self.response_map[request] |
| if len(hosts) > 1: |
| raise rdb_utils.RDBException('Got multiple hosts for a single ' |
| 'request. Hosts: %s, request %s.' % (hosts, request)) |
| if (self.valid_host_assignment(request, hosts[0]) and |
| self.lease_hosts(hosts)): |
| continue |
| del self.response_map[request] |
| logging.warning('Request %s was not able to lease host %s', |
| request, hosts[0]) |
| |
| |
| # Request dispatchers: Create the appropriate request handler, send a list |
| # of requests to one of its methods. The corresponding request handler in |
| # rdb_lib must understand how to match each request with a response from a |
| # dispatcher, the easiest way to achieve this is to returned the response_map |
| # attribute of the request handler, after making the appropriate requests. |
| def get_hosts(host_requests): |
| """Get host information about the requested hosts. |
| |
| @param host_requests: A list of requests as defined in BaseHostRequest. |
| @return: A dictionary mapping each request to a list of hosts. |
| """ |
| rdb_handler = BaseHostRequestHandler() |
| rdb_handler.batch_get_hosts(host_requests) |
| return rdb_handler.get_response() |
| |
| |
| def update_hosts(update_requests): |
| """Update hosts. |
| |
| @param update_requests: A list of updates to host tables |
| as defined in UpdateHostRequest. |
| """ |
| rdb_handler = BaseHostRequestHandler() |
| rdb_handler.update_hosts(update_requests) |
| return rdb_handler.get_response() |
| |
| |
| def rdb_host_request_dispatcher(host_requests): |
| """Dispatcher for all host acquisition queries. |
| |
| @param host_requests: A list of requests for acquiring hosts, as defined in |
| AcquireHostRequest. |
| @return: A dictionary mapping each request to a list of hosts, or |
| an empty list if none could satisfy the request. Eg: |
| {AcquireHostRequest.template: [host_info_dictionaries]} |
| """ |
| validation_requests = [] |
| require_hosts_requests = [] |
| |
| # Validation requests are made by a job scheduled against a specific host |
| # specific host (eg: through the frontend) and only require the rdb to |
| # match the parameters of the host against the request. Acquisition |
| # requests are made by jobs that need hosts (eg: suites) and the rdb needs |
| # to find hosts matching the parameters of the request. |
| for request in host_requests: |
| if request.host_id: |
| validation_requests.append(request) |
| else: |
| require_hosts_requests.append(request) |
| |
| rdb_handler = AvailableHostRequestHandler() |
| rdb_handler.batch_validate_hosts(validation_requests) |
| rdb_handler.batch_acquire_hosts(require_hosts_requests) |
| return rdb_handler.get_response() |