| # Copyright (c) 2014 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Rdb server module. |
| """ |
| |
| import collections |
| import logging |
| |
| import common |
| |
| from django.core import exceptions as django_exceptions |
| from django.db.models import fields |
| from django.db.models import Q |
| from autotest_lib.client.common_lib.cros.graphite import stats |
| from autotest_lib.frontend.afe import models |
| from autotest_lib.scheduler import rdb_cache_manager |
| from autotest_lib.scheduler import rdb_hosts |
| from autotest_lib.scheduler import rdb_requests |
| from autotest_lib.scheduler import rdb_utils |
| |
| |
| _timer = stats.Timer(rdb_utils.RDB_STATS_KEY) |
| |
| |
| # Qeury managers: Provide a layer of abstraction over the database by |
| # encapsulating common query patterns used by the rdb. |
| class BaseHostQueryManager(object): |
| """Base manager for host queries on all hosts. |
| """ |
| |
| host_objects = models.Host.objects |
| |
| |
| def update_hosts(self, host_ids, **kwargs): |
| """Update fields on a hosts. |
| |
| @param host_ids: A list of ids of hosts to update. |
| @param kwargs: A key value dictionary corresponding to column, value |
| in the host database. |
| """ |
| self.host_objects.filter(id__in=host_ids).update(**kwargs) |
| |
| |
| @rdb_hosts.return_rdb_host |
| def get_hosts(self, ids): |
| """Get host objects for the given ids. |
| |
| @param ids: The ids for which we need host objects. |
| |
| @returns: A list of RDBServerHostWrapper objects, ordered by host_id. |
| """ |
| return self.host_objects.filter(id__in=ids).order_by('id') |
| |
| |
| @rdb_hosts.return_rdb_host |
| def find_hosts(self, deps, acls): |
| """Finds valid hosts matching deps, acls. |
| |
| @param deps: A list of dependencies to match. |
| @param acls: A list of acls, at least one of which must coincide with |
| an acl group the chosen host is in. |
| |
| @return: A set of matching hosts available. |
| """ |
| hosts_available = self.host_objects.filter(invalid=0) |
| queries = [Q(labels__id=dep) for dep in deps] |
| queries += [Q(aclgroup__id__in=acls)] |
| for query in queries: |
| hosts_available = hosts_available.filter(query) |
| return set(hosts_available) |
| |
| |
| class AvailableHostQueryManager(BaseHostQueryManager): |
| """Query manager for requests on un-leased, un-locked hosts. |
| """ |
| |
| host_objects = models.Host.leased_objects |
| |
| |
| # Request Handlers: Used in conjunction with requests in rdb_utils, these |
| # handlers acquire hosts for a request and record the acquisition in |
| # an response_map dictionary keyed on the request itself, with the host/hosts |
| # as values. |
| class BaseHostRequestHandler(object): |
| """Handler for requests related to hosts, leased or unleased. |
| |
| This class is only capable of blindly returning host information. |
| """ |
| |
| def __init__(self): |
| self.host_query_manager = BaseHostQueryManager() |
| self.response_map = {} |
| |
| |
| def update_response_map(self, request, response): |
| """Record a response for a request. |
| |
| The response_map only contains requests that were either satisfied, or |
| that ran into an exception. Often this translates to reserving hosts |
| against a request. If the rdb hit an exception processing a request, the |
| exception gets recorded in the map for the client to reraise. |
| |
| @param response: A response for the request. |
| @param request: The request that has reserved these hosts. |
| |
| @raises RDBException: If an empty values is added to the map. |
| """ |
| if not response: |
| raise rdb_utils.RDBException('response_map dict can only contain ' |
| 'valid responses. Request %s, response %s is invalid.' % |
| (request, response)) |
| if self.response_map.get(request): |
| raise rdb_utils.RDBException('Request %s already has response %s ' |
| 'the rdb cannot return multiple ' |
| 'responses for the same request.' % |
| (request, response)) |
| self.response_map[request] = response |
| |
| |
| def _check_response_map(self): |
| """Verify that we never give the same host to different requests. |
| |
| @raises RDBException: If the same host is assigned to multiple requests. |
| """ |
| unique_hosts = set([]) |
| for request, response in self.response_map.iteritems(): |
| # Each value in the response map can only either be a list of |
| # RDBHosts or a list of RDBExceptions, not a mix of both. |
| if isinstance(response[0], rdb_hosts.RDBHost): |
| if any([host in unique_hosts for host in response]): |
| raise rdb_utils.RDBException( |
| 'Assigning the same host to multiple requests. New ' |
| 'hosts %s, request %s, response_map: %s' % |
| (response, request, self.response_map)) |
| else: |
| unique_hosts = unique_hosts.union(response) |
| |
| |
| def _record_exceptions(self, request, exceptions): |
| """Record a list of exceptions for a request. |
| |
| @param request: The request for which the exceptions were hit. |
| @param exceptions: The exceptions hit while processing the request. |
| """ |
| rdb_exceptions = [rdb_utils.RDBException(ex) for ex in exceptions] |
| self.update_response_map(request, rdb_exceptions) |
| |
| |
| def get_response(self): |
| """Convert all RDBServerHostWrapper objects to host info dictionaries. |
| |
| @return: A dictionary mapping requests to a list of matching host_infos. |
| |
| @raises RDBException: If the same host is assigned to multiple requests. |
| """ |
| self._check_response_map() |
| for request, response in self.response_map.iteritems(): |
| self.response_map[request] = [reply.wire_format() |
| for reply in response] |
| return self.response_map |
| |
| |
| def update_hosts(self, update_requests): |
| """Updates host tables with a payload. |
| |
| @param update_requests: A list of update requests, as defined in |
| rdb_requests.UpdateHostRequest. |
| """ |
| # Last payload for a host_id wins in the case of conflicting requests. |
| unique_host_requests = {} |
| for request in update_requests: |
| if unique_host_requests.get(request.host_id): |
| unique_host_requests[request.host_id].update(request.payload) |
| else: |
| unique_host_requests[request.host_id] = request.payload |
| |
| # Batch similar payloads so we can do them in one table scan. |
| similar_requests = {} |
| for host_id, payload in unique_host_requests.iteritems(): |
| similar_requests.setdefault(payload, []).append(host_id) |
| |
| # If fields of the update don't match columns in the database, |
| # record the exception in the response map. This also means later |
| # updates will get applied even if previous updates fail. |
| for payload, hosts in similar_requests.iteritems(): |
| try: |
| response = self.host_query_manager.update_hosts(hosts, **payload) |
| except (django_exceptions.FieldError, |
| fields.FieldDoesNotExist, ValueError) as e: |
| for host in hosts: |
| # Since update requests have a consistent hash this will map |
| # to the same key as the original request. |
| request = rdb_requests.UpdateHostRequest( |
| host_id=host, payload=payload).get_request() |
| self._record_exceptions(request, [e]) |
| |
| |
| def batch_get_hosts(self, host_requests): |
| """Get hosts matching the requests. |
| |
| This method does not acquire the hosts, i.e it reserves hosts against |
| requests leaving their leased state untouched. |
| |
| @param host_requests: A list of requests, as defined in |
| rdb_utils.BaseHostRequest. |
| """ |
| host_ids = set([request.host_id for request in host_requests]) |
| host_map = {} |
| |
| # This list will not contain available hosts if executed using |
| # an AvailableHostQueryManager. |
| for host in self.host_query_manager.get_hosts(host_ids): |
| host_map[host.id] = host |
| for request in host_requests: |
| if request.host_id in host_map: |
| self.update_response_map(request, [host_map[request.host_id]]) |
| else: |
| logging.warning('rdb could not get host for request: %s, it ' |
| 'is already leased or locked', request) |
| |
| |
| class AvailableHostRequestHandler(BaseHostRequestHandler): |
| """Handler for requests related to available (unleased and unlocked) hosts. |
| |
| This class is capable of acquiring or validating hosts for requests. |
| """ |
| |
| |
| def __init__(self): |
| self.host_query_manager = AvailableHostQueryManager() |
| self.cache = rdb_cache_manager.RDBHostCacheManager() |
| self.response_map = {} |
| self.unsatisfied_requests = 0 |
| self.leased_hosts_count = 0 |
| |
| |
| @_timer.decorate |
| def lease_hosts(self, hosts): |
| """Leases a list of hosts. |
| |
| @param hosts: A list of RDBServerHostWrapper instances to lease. |
| |
| @return: The list of RDBServerHostWrappers that were successfully |
| leased. |
| """ |
| #TODO(beeps): crbug.com/353183. |
| unleased_hosts = set(hosts) |
| leased_hosts = set([]) |
| for host in unleased_hosts: |
| try: |
| host.lease() |
| except rdb_utils.RDBException as e: |
| logging.error('Unable to lease host %s: %s', host.hostname, e) |
| else: |
| leased_hosts.add(host) |
| return list(leased_hosts) |
| |
| |
| @classmethod |
| def valid_host_assignment(cls, request, host): |
| """Check if a host, request pairing is valid. |
| |
| @param request: The request to match against the host. |
| @param host: An RDBServerHostWrapper instance. |
| |
| @return: True if the host, request assignment is valid. |
| |
| @raises RDBException: If the request already has another host_ids |
| associated with it. |
| """ |
| if request.host_id and request.host_id != host.id: |
| raise rdb_utils.RDBException( |
| 'Cannot assign a different host for request: %s, it ' |
| 'already has one: %s ' % (request, host.id)) |
| |
| # Getting all labels and acls might result in large queries, so |
| # bail early if the host is already leased. |
| if host.leased: |
| return False |
| # If a host is invalid it must be a one time host added to the |
| # afe specifically for this purpose, so it doesn't require acl checking. |
| acl_match = (request.acls.intersection(host.acls) or host.invalid) |
| label_match = (request.deps.intersection(host.labels) == request.deps) |
| return acl_match and label_match |
| |
| |
| @classmethod |
| def batch_requests(cls, requests): |
| """ Group similar requests, sort by priority and parent_job_id. |
| |
| @param requests: A list or unsorted, unordered requests. |
| |
| @return: A list of tuples of the form (request, number of occurances) |
| formed by counting the number of requests with the same acls/deps/ |
| priority in the input list of requests, and sorting by priority. |
| The order of this list ensures against priority inversion. |
| """ |
| sort_function = lambda request: (request[0].priority, |
| -request[0].parent_job_id) |
| return sorted(collections.Counter(requests).items(), key=sort_function, |
| reverse=True) |
| |
| |
| @rdb_cache_manager.memoize_hosts |
| def _acquire_hosts(self, request, hosts_required, **kwargs): |
| """Acquire hosts for a group of similar requests. |
| |
| Find and acquire hosts that can satisfy a group of requests. |
| 1. If the caching decorator doesn't pass in a list of matching hosts |
| via the MEMOIZE_KEY this method will directly check the database for |
| matching hosts. |
| 2. If all matching hosts are not leased for this request, the remaining |
| hosts are returned to the caching decorator, to place in the cache. |
| |
| @param hosts_required: Number of hosts required to satisfy request. |
| @param request: The request for hosts. |
| |
| @return: The list of excess matching hosts. |
| """ |
| hosts = kwargs.get(rdb_cache_manager.MEMOIZE_KEY, []) |
| if not hosts: |
| hosts = self.host_query_manager.find_hosts( |
| request.deps, request.acls) |
| |
| # <-----[:attempt_lease_hosts](evicted)--------> <-(returned, cached)-> |
| # | -leased_hosts- | -stale cached hosts- | -unleased matching- | |
| # --used this request---used by earlier request----------unused-------- |
| attempt_lease_hosts = min(len(hosts), hosts_required) |
| leased_host_count = 0 |
| if attempt_lease_hosts: |
| leased_hosts = self.lease_hosts(hosts[:attempt_lease_hosts]) |
| if leased_hosts: |
| self.update_response_map(request, leased_hosts) |
| |
| # [:attempt_leased_hosts] - leased_hosts will include hosts that |
| # failed leasing, most likely because they're already leased, so |
| # don't cache them again. |
| leased_host_count = len(leased_hosts) |
| failed_leasing = attempt_lease_hosts - leased_host_count |
| if failed_leasing > 0: |
| # For the sake of simplicity this calculation assumes that |
| # leasing only fails if there's a stale cached host already |
| # leased by a previous request, ergo, we can only get here |
| # through a cache hit. |
| line_length = len(hosts) |
| self.cache.stale_entries.append( |
| (float(failed_leasing)/line_length) * 100) |
| self.leased_hosts_count += leased_host_count |
| self.unsatisfied_requests += max(hosts_required - leased_host_count, 0) |
| # Cache the unleased matching hosts against the request. |
| return hosts[attempt_lease_hosts:] |
| |
| |
| @_timer.decorate |
| def batch_acquire_hosts(self, host_requests): |
| """Acquire hosts for a list of requests. |
| |
| The act of acquisition involves finding and leasing a set of hosts |
| that match the parameters of a request. Each acquired host is added |
| to the response_map dictionary as an RDBServerHostWrapper. |
| |
| @param host_requests: A list of requests to acquire hosts. |
| """ |
| distinct_requests = 0 |
| |
| logging.debug('Processing %s host acquisition requests', |
| len(host_requests)) |
| for request, count in self.batch_requests(host_requests): |
| self._acquire_hosts(request, count) |
| distinct_requests += 1 |
| |
| self.cache.record_stats() |
| logging.debug('Host acquisition stats: distinct requests: %s, leased ' |
| 'hosts: %s, unsatisfied requests: %s', distinct_requests, |
| self.leased_hosts_count, self.unsatisfied_requests) |
| stats.Gauge(rdb_utils.RDB_STATS_KEY).send('leased_hosts', |
| self.leased_hosts_count) |
| stats.Gauge(rdb_utils.RDB_STATS_KEY).send('unsatisfied_requests', |
| self.unsatisfied_requests) |
| |
| |
| @_timer.decorate |
| def batch_validate_hosts(self, requests): |
| """Validate requests with hosts. |
| |
| Reserve all hosts, check each one for validity and discard invalid |
| request-host pairings. Lease the remaining hsots. |
| |
| @param requests: A list of requests to validate. |
| |
| @raises RDBException: If multiple hosts or the wrong host is returned |
| for a response. |
| """ |
| # The following cases are possible for frontend requests: |
| # 1. Multiple requests for 1 host, with different acls/deps/priority: |
| # These form distinct requests because they hash differently. |
| # The response map will contain entries like: {r1: h1, r2: h1} |
| # after the batch_get_hosts call. There are 2 sub-cases: |
| # a. Same deps/acls, different priority: |
| # Since we sort the requests based on priority, the |
| # higher priority request r1, will lease h1. The |
| # validation of r2, h1 will fail because of the r1 lease. |
| # b. Different deps/acls, only one of which matches the host: |
| # The matching request will lease h1. The other host |
| # pairing will get dropped from the response map. |
| # 2. Multiple requests with the same acls/deps/priority and 1 host: |
| # These all have the same request hash, so the response map will |
| # contain: {r: h}, regardless of the number of r's. If this is not |
| # a valid host assignment it will get dropped from the response. |
| self.batch_get_hosts(set(requests)) |
| for request in sorted(self.response_map.keys(), |
| key=lambda request: request.priority, reverse=True): |
| hosts = self.response_map[request] |
| if len(hosts) > 1: |
| raise rdb_utils.RDBException('Got multiple hosts for a single ' |
| 'request. Hosts: %s, request %s.' % (hosts, request)) |
| if (self.valid_host_assignment(request, hosts[0]) and |
| self.lease_hosts(hosts)): |
| continue |
| del self.response_map[request] |
| logging.warning('Request %s was not able to lease host %s', |
| request, hosts[0]) |
| |
| |
| # Request dispatchers: Create the appropriate request handler, send a list |
| # of requests to one of its methods. The corresponding request handler in |
| # rdb_lib must understand how to match each request with a response from a |
| # dispatcher, the easiest way to achieve this is to returned the response_map |
| # attribute of the request handler, after making the appropriate requests. |
| def get_hosts(host_requests): |
| """Get host information about the requested hosts. |
| |
| @param host_requests: A list of requests as defined in BaseHostRequest. |
| @return: A dictionary mapping each request to a list of hosts. |
| """ |
| rdb_handler = BaseHostRequestHandler() |
| rdb_handler.batch_get_hosts(host_requests) |
| return rdb_handler.get_response() |
| |
| |
| def update_hosts(update_requests): |
| """Update hosts. |
| |
| @param update_requests: A list of updates to host tables |
| as defined in UpdateHostRequest. |
| """ |
| rdb_handler = BaseHostRequestHandler() |
| rdb_handler.update_hosts(update_requests) |
| return rdb_handler.get_response() |
| |
| |
| def rdb_host_request_dispatcher(host_requests): |
| """Dispatcher for all host acquisition queries. |
| |
| @param host_requests: A list of requests for acquiring hosts, as defined in |
| AcquireHostRequest. |
| @return: A dictionary mapping each request to a list of hosts, or |
| an empty list if none could satisfy the request. Eg: |
| {AcquireHostRequest.template: [host_info_dictionaries]} |
| """ |
| validation_requests = [] |
| require_hosts_requests = [] |
| |
| # Validation requests are made by a job scheduled against a specific host |
| # specific host (eg: through the frontend) and only require the rdb to |
| # match the parameters of the host against the request. Acquisition |
| # requests are made by jobs that need hosts (eg: suites) and the rdb needs |
| # to find hosts matching the parameters of the request. |
| for request in host_requests: |
| if request.host_id: |
| validation_requests.append(request) |
| else: |
| require_hosts_requests.append(request) |
| |
| rdb_handler = AvailableHostRequestHandler() |
| rdb_handler.batch_validate_hosts(validation_requests) |
| rdb_handler.batch_acquire_hosts(require_hosts_requests) |
| return rdb_handler.get_response() |