| """ |
| Autotest client module for the rdb. |
| """ |
| |
| |
| import logging |
| |
| from autotest_lib.client.common_lib import global_config, utils |
| from autotest_lib.frontend.afe import models |
| from autotest_lib.scheduler import metahost_scheduler, scheduler_config |
| from autotest_lib.scheduler import rdb, scheduler_models |
| from autotest_lib.site_utils.graphite import stats |
| from autotest_lib.server.cros import provision |
| |
| |
| get_site_metahost_schedulers = utils.import_site_function( |
| __file__, 'autotest_lib.scheduler.site_metahost_scheduler', |
| 'get_metahost_schedulers', lambda : ()) |
| |
| |
| class SchedulerError(Exception): |
| """Raised by HostScheduler when an inconsistent state occurs.""" |
| |
| |
| class BaseHostScheduler(metahost_scheduler.HostSchedulingUtility): |
| """Handles the logic for choosing when to run jobs and on which hosts. |
| |
| This class makes several queries to the database on each tick, building up |
| some auxiliary data structures and using them to determine which hosts are |
| eligible to run which jobs, taking into account all the various factors that |
| affect that. |
| |
| In the past this was done with one or two very large, complex database |
| queries. It has proven much simpler and faster to build these auxiliary |
| data structures and perform the logic in Python. |
| """ |
| |
| |
| _timer = stats.Timer('host_scheduler') |
| |
| |
| def __init__(self, db): |
| self._db = db |
| self._metahost_schedulers = metahost_scheduler.get_metahost_schedulers() |
| |
| # load site-specific scheduler selected in global_config |
| site_schedulers_str = global_config.global_config.get_config_value( |
| scheduler_config.CONFIG_SECTION, 'site_metahost_schedulers', |
| default='') |
| site_schedulers = set(site_schedulers_str.split(',')) |
| for scheduler in get_site_metahost_schedulers(): |
| if type(scheduler).__name__ in site_schedulers: |
| # always prepend, so site schedulers take precedence |
| self._metahost_schedulers = ( |
| [scheduler] + self._metahost_schedulers) |
| logging.info('Metahost schedulers: %s', |
| ', '.join(type(scheduler).__name__ for scheduler |
| in self._metahost_schedulers)) |
| |
| |
| @_timer.decorate |
| def _get_ready_hosts(self): |
| # avoid any host with a currently active queue entry against it |
| hosts = scheduler_models.Host.fetch( |
| joins='LEFT JOIN afe_host_queue_entries AS active_hqe ' |
| 'ON (afe_hosts.id = active_hqe.host_id AND ' |
| 'active_hqe.active)', |
| where="active_hqe.host_id IS NULL " |
| "AND NOT afe_hosts.locked " |
| "AND (afe_hosts.status IS NULL " |
| "OR afe_hosts.status = 'Ready')") |
| return dict((host.id, host) for host in hosts) |
| |
| |
| def _get_sql_id_list(self, id_list): |
| return ','.join(str(item_id) for item_id in id_list) |
| |
| |
| def _get_many2many_dict(self, query, id_list, flip=False): |
| if not id_list: |
| return {} |
| query %= self._get_sql_id_list(id_list) |
| rows = self._db.execute(query) |
| return self._process_many2many_dict(rows, flip) |
| |
| |
| def _process_many2many_dict(self, rows, flip=False): |
| result = {} |
| for row in rows: |
| left_id, right_id = int(row[0]), int(row[1]) |
| if flip: |
| left_id, right_id = right_id, left_id |
| result.setdefault(left_id, set()).add(right_id) |
| return result |
| |
| |
| @_timer.decorate |
| def _get_job_acl_groups(self, job_ids): |
| query = """ |
| SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id |
| FROM afe_jobs |
| INNER JOIN afe_users ON afe_users.login = afe_jobs.owner |
| INNER JOIN afe_acl_groups_users ON |
| afe_acl_groups_users.user_id = afe_users.id |
| WHERE afe_jobs.id IN (%s) |
| """ |
| return self._get_many2many_dict(query, job_ids) |
| |
| |
| @_timer.decorate |
| def _get_job_ineligible_hosts(self, job_ids): |
| query = """ |
| SELECT job_id, host_id |
| FROM afe_ineligible_host_queues |
| WHERE job_id IN (%s) |
| """ |
| return self._get_many2many_dict(query, job_ids) |
| |
| |
| @_timer.decorate |
| def _get_job_dependencies(self, job_ids): |
| query = """ |
| SELECT job_id, label_id |
| FROM afe_jobs_dependency_labels |
| WHERE job_id IN (%s) |
| """ |
| return self._get_many2many_dict(query, job_ids) |
| |
| |
| @_timer.decorate |
| def _get_host_acls(self, host_ids): |
| query = """ |
| SELECT host_id, aclgroup_id |
| FROM afe_acl_groups_hosts |
| WHERE host_id IN (%s) |
| """ |
| return self._get_many2many_dict(query, host_ids) |
| |
| |
| @_timer.decorate |
| def _get_label_hosts(self, host_ids): |
| if not host_ids: |
| return {}, {} |
| query = """ |
| SELECT label_id, host_id |
| FROM afe_hosts_labels |
| WHERE host_id IN (%s) |
| """ % self._get_sql_id_list(host_ids) |
| rows = self._db.execute(query) |
| labels_to_hosts = self._process_many2many_dict(rows) |
| hosts_to_labels = self._process_many2many_dict(rows, flip=True) |
| return labels_to_hosts, hosts_to_labels |
| |
| |
| @_timer.decorate |
| def _get_labels(self, job_dependencies): |
| """ |
| Calculate a dict mapping label id to label object so that we don't |
| frequently round trip to the database every time we need a label. |
| |
| @param job_dependencies: A dict mapping an integer job id to a list of |
| integer label id's. ie. {job_id: [label_id]} |
| @return: A dict mapping an integer label id to a scheduler model label |
| object. ie. {label_id: label_object} |
| |
| """ |
| id_to_label = dict() |
| # Pull all the labels on hosts we might look at |
| host_labels = scheduler_models.Label.fetch( |
| where="id IN (SELECT label_id FROM afe_hosts_labels)") |
| id_to_label.update([(label.id, label) for label in host_labels]) |
| # and pull all the labels on jobs we might look at. |
| job_label_set = set() |
| for job_deps in job_dependencies.values(): |
| job_label_set.update(job_deps) |
| # On the rare/impossible chance that no jobs have any labels, we |
| # can skip this. |
| if job_label_set: |
| job_string_label_list = ','.join([str(x) for x in job_label_set]) |
| job_labels = scheduler_models.Label.fetch( |
| where="id IN (%s)" % job_string_label_list) |
| id_to_label.update([(label.id, label) for label in job_labels]) |
| return id_to_label |
| |
| |
| def recovery_on_startup(self): |
| for metahost_scheduler in self._metahost_schedulers: |
| metahost_scheduler.recovery_on_startup() |
| |
| |
| @_timer.decorate |
| def refresh(self, pending_queue_entries): |
| self._hosts_available = self._get_ready_hosts() |
| |
| relevant_jobs = [queue_entry.job_id |
| for queue_entry in pending_queue_entries] |
| self._job_acls = self._get_job_acl_groups(relevant_jobs) |
| self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs) |
| self._job_dependencies = self._get_job_dependencies(relevant_jobs) |
| |
| host_ids = self._hosts_available.keys() |
| self._host_acls = self._get_host_acls(host_ids) |
| self._label_hosts, self._host_labels = self._get_label_hosts(host_ids) |
| |
| self._labels = self._get_labels(self._job_dependencies) |
| |
| |
| @_timer.decorate |
| def tick(self): |
| for metahost_scheduler in self._metahost_schedulers: |
| metahost_scheduler.tick() |
| |
| |
| def hosts_in_label(self, label_id): |
| return set(self._label_hosts.get(label_id, ())) |
| |
| |
| def remove_host_from_label(self, host_id, label_id): |
| self._label_hosts[label_id].remove(host_id) |
| |
| |
| def pop_host(self, host_id): |
| return self._hosts_available.pop(host_id) |
| |
| |
| def ineligible_hosts_for_entry(self, queue_entry): |
| return set(self._ineligible_hosts.get(queue_entry.job_id, ())) |
| |
| |
| def _is_acl_accessible(self, host_id, queue_entry): |
| job_acls = self._job_acls.get(queue_entry.job_id, set()) |
| host_acls = self._host_acls.get(host_id, set()) |
| return len(host_acls.intersection(job_acls)) > 0 |
| |
| |
| def _check_job_dependencies(self, job_dependencies, host_labels): |
| missing = job_dependencies - host_labels |
| return len(missing) == 0 |
| |
| |
| def _check_only_if_needed_labels(self, job_dependencies, host_labels, |
| queue_entry): |
| if not queue_entry.meta_host: |
| # bypass only_if_needed labels when a specific host is selected |
| return True |
| |
| for label_id in host_labels: |
| label = self._labels[label_id] |
| if not label.only_if_needed: |
| # we don't care about non-only_if_needed labels |
| continue |
| if queue_entry.meta_host == label_id: |
| # if the label was requested in a metahost it's OK |
| continue |
| if label_id not in job_dependencies: |
| return False |
| return True |
| |
| |
| def _check_atomic_group_labels(self, host_labels, queue_entry): |
| """ |
| Determine if the given HostQueueEntry's atomic group settings are okay |
| to schedule on a host with the given labels. |
| |
| @param host_labels: A list of label ids that the host has. |
| @param queue_entry: The HostQueueEntry being considered for the host. |
| |
| @returns True if atomic group settings are okay, False otherwise. |
| """ |
| return (self._get_host_atomic_group_id(host_labels, queue_entry) == |
| queue_entry.atomic_group_id) |
| |
| |
| def _get_host_atomic_group_id(self, host_labels, queue_entry=None): |
| """ |
| Return the atomic group label id for a host with the given set of |
| labels if any, or None otherwise. Raises an exception if more than |
| one atomic group are found in the set of labels. |
| |
| @param host_labels: A list of label ids that the host has. |
| @param queue_entry: The HostQueueEntry we're testing. Only used for |
| extra info in a potential logged error message. |
| |
| @returns The id of the atomic group found on a label in host_labels |
| or None if no atomic group label is found. |
| """ |
| atomic_labels = [self._labels[label_id] for label_id in host_labels |
| if self._labels[label_id].atomic_group_id is not None] |
| atomic_ids = set(label.atomic_group_id for label in atomic_labels) |
| if not atomic_ids: |
| return None |
| if len(atomic_ids) > 1: |
| logging.error('More than one Atomic Group on HQE "%s" via: %r', |
| queue_entry, atomic_labels) |
| return atomic_ids.pop() |
| |
| |
| def _get_atomic_group_labels(self, atomic_group_id): |
| """ |
| Lookup the label ids that an atomic_group is associated with. |
| |
| @param atomic_group_id - The id of the AtomicGroup to look up. |
| |
| @returns A generator yeilding Label ids for this atomic group. |
| """ |
| return (id for id, label in self._labels.iteritems() |
| if label.atomic_group_id == atomic_group_id |
| and not label.invalid) |
| |
| |
| def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry): |
| """ |
| @param group_hosts - A sequence of Host ids to test for usability |
| and eligibility against the Job associated with queue_entry. |
| @param queue_entry - The HostQueueEntry that these hosts are being |
| tested for eligibility against. |
| |
| @returns A subset of group_hosts Host ids that are eligible for the |
| supplied queue_entry. |
| """ |
| return set(host_id for host_id in group_hosts |
| if self.is_host_usable(host_id) |
| and self.is_host_eligible_for_job(host_id, queue_entry)) |
| |
| |
| def is_host_eligible_for_job(self, host_id, queue_entry): |
| if self._is_host_invalid(host_id): |
| # if an invalid host is scheduled for a job, it's a one-time host |
| # and it therefore bypasses eligibility checks. note this can only |
| # happen for non-metahosts, because invalid hosts have their label |
| # relationships cleared. |
| return True |
| |
| job_dependencies = self._job_dependencies.get(queue_entry.job_id, set()) |
| # Remove provisionable labels from the set of job_dependencies that we |
| # need to satisfy |
| job_dependencies = set([dep for dep in job_dependencies if |
| not provision.can_provision(self._labels[dep].name)]) |
| host_labels = self._host_labels.get(host_id, set()) |
| |
| return (self._is_acl_accessible(host_id, queue_entry) and |
| self._check_job_dependencies(job_dependencies, host_labels) and |
| self._check_only_if_needed_labels( |
| job_dependencies, host_labels, queue_entry) and |
| self._check_atomic_group_labels(host_labels, queue_entry)) |
| |
| |
| def _is_host_invalid(self, host_id): |
| host_object = self._hosts_available.get(host_id, None) |
| return host_object and host_object.invalid |
| |
| |
| def is_host_usable(self, host_id): |
| if host_id not in self._hosts_available: |
| # host was already used during this scheduling cycle |
| return False |
| if self._hosts_available[host_id].invalid: |
| # Invalid hosts cannot be used for metahosts. They're included in |
| # the original query because they can be used by non-metahosts. |
| return False |
| return True |
| |
| |
| def get_job_info(self, queue_entry): |
| """ |
| Extract job information from a queue_entry/host-scheduler. |
| |
| Unfortunately the information needed to choose hosts for a job |
| are split across several tables and not restricted to just the |
| hqe. At the very least we require the deps and acls of the job, but |
| we also need to know if the job has a host assigned to it. This method |
| consolidates this information into a light-weight dictionary that the |
| host_scheduler and the rdb can pass back and forth. |
| |
| @param queue_entry: the queue_entry of the job we would like |
| information about. |
| |
| @return: A dictionary containing 1. A set of deps 2. A set of acls |
| 3. The host id of the host assigned to the hqe, or None. |
| """ |
| job_id = queue_entry.job_id |
| host_id = queue_entry.host_id |
| job_deps = self._job_dependencies.get(job_id, set()) |
| job_deps = set([dep for dep in job_deps if |
| not provision.can_provision(self._labels[dep].name)]) |
| job_acls = self._job_acls.get(job_id, set()) |
| |
| return {'deps': set(job_deps), |
| 'acls': set(job_acls), |
| 'host_id': host_id} |
| |
| |
| def schedule_entry(self, queue_entry): |
| """ |
| Schedule a hqe aginst a host. |
| |
| A hqe can either have a host assigned to it or not. In eithercase |
| however, actually scheduling the hqe on the host involves validating |
| the assignment by checking acls and labels. If the hqe doesn't have a |
| host we need to find a host before we can perform this validation. |
| |
| If we successfully validate the host->hqe pairing, return the host. The |
| scheduler will not begin scheduling special tasks for the hqe until it |
| acquires a valid host. |
| |
| @param queue_entry: The queue_entry that requires a host. |
| @return: The host assigned to the hqe, if any. |
| """ |
| host_id = queue_entry.host_id |
| job_id = queue_entry.job_id |
| job_info = self.get_job_info(queue_entry) |
| host = None |
| |
| if host_id: |
| host = self._hosts_available.get(host_id, None) |
| |
| # TODO(beeps): Remove the need for 2 rdb calls. Ideally we should |
| # just do one call to validate the assignment, however, since we're |
| # currently still using the host_scheduler, we'd need to pass it |
| # as an argument to validate_host_assignment, which is less clean |
| # than just splitting this work into 2 calls. |
| host_info = rdb.get_host_info(self, host_id) |
| |
| # If the host is either unavailable or in-eligible for this job, |
| # defer scheduling this queue_entry till the next tick. |
| if (host is None or not |
| rdb.validate_host_assignment(job_info, host_info)): |
| return None |
| else: |
| host = rdb.get_host(self, job_info) |
| if host is None: |
| return None |
| queue_entry.set_host(host) |
| |
| # TODO(beeps): Make it so we don't need to set the hqe active status |
| # to remove a host from the active pool. |
| # A host will remain in the available pool for as long as its status |
| # is Ready and it is not referenced by an active hqe. The state of |
| # the host is not under our control, as it will only change to |
| # resetting etc whenever the prejob task starts. However, the hqe |
| # is theoretically active from the moment we assign a healthy host |
| # to it. Setting the host on an inactive hqe will not remove it |
| # from the available pool, leading to unnecessary scheduling |
| # overhead. |
| # Without this, we will process each hqe twice because it is still |
| # judged as 'new', and perform the host<->hqe assignment twice, |
| # because the host assigned to the hqe is still 'available', as |
| # the first prejob task only runs at the end of the next tick's |
| # handle_agents call. Note that the status is still 'Queued', and |
| # will remaing 'Queued' till an agent changes it. |
| queue_entry.update_field('active', True) |
| |
| # The available_hosts dictionary determines our scheduling decisions |
| # for subsequent jobs processed in this tick. |
| self._hosts_available.pop(host.id) |
| logging.debug('Scheduling job: %s, Host %s', job_id, host.id) |
| return host |
| |
| |
| def find_eligible_atomic_group(self, queue_entry): |
| """ |
| Given an atomic group host queue entry, locate an appropriate group |
| of hosts for the associated job to run on. |
| |
| The caller is responsible for creating new HQEs for the additional |
| hosts returned in order to run the actual job on them. |
| |
| @returns A list of Host instances in a ready state to satisfy this |
| atomic group scheduling. Hosts will all belong to the same |
| atomic group label as specified by the queue_entry. |
| An empty list will be returned if no suitable atomic |
| group could be found. |
| |
| TODO(gps): what is responsible for kicking off any attempted repairs on |
| a group of hosts? not this function, but something needs to. We do |
| not communicate that reason for returning [] outside of here... |
| For now, we'll just be unschedulable if enough hosts within one group |
| enter Repair Failed state. |
| """ |
| assert queue_entry.atomic_group_id is not None |
| job = queue_entry.job |
| assert job.synch_count and job.synch_count > 0 |
| atomic_group = queue_entry.atomic_group |
| if job.synch_count > atomic_group.max_number_of_machines: |
| # Such a Job and HostQueueEntry should never be possible to |
| # create using the frontend. Regardless, we can't process it. |
| # Abort it immediately and log an error on the scheduler. |
| queue_entry.set_status(models.HostQueueEntry.Status.ABORTED) |
| logging.error( |
| 'Error: job %d synch_count=%d > requested atomic_group %d ' |
| 'max_number_of_machines=%d. Aborted host_queue_entry %d.', |
| job.id, job.synch_count, atomic_group.id, |
| atomic_group.max_number_of_machines, queue_entry.id) |
| return [] |
| hosts_in_label = self.hosts_in_label(queue_entry.meta_host) |
| ineligible_host_ids = self.ineligible_hosts_for_entry(queue_entry) |
| |
| # Look in each label associated with atomic_group until we find one with |
| # enough hosts to satisfy the job. |
| for atomic_label_id in self._get_atomic_group_labels(atomic_group.id): |
| group_hosts = set(self.hosts_in_label(atomic_label_id)) |
| if queue_entry.meta_host is not None: |
| # If we have a metahost label, only allow its hosts. |
| group_hosts.intersection_update(hosts_in_label) |
| group_hosts -= ineligible_host_ids |
| eligible_host_ids_in_group = self._get_eligible_host_ids_in_group( |
| group_hosts, queue_entry) |
| |
| # Job.synch_count is treated as "minimum synch count" when |
| # scheduling for an atomic group of hosts. The atomic group |
| # number of machines is the maximum to pick out of a single |
| # atomic group label for scheduling at one time. |
| min_hosts = job.synch_count |
| max_hosts = atomic_group.max_number_of_machines |
| |
| if len(eligible_host_ids_in_group) < min_hosts: |
| # Not enough eligible hosts in this atomic group label. |
| continue |
| |
| eligible_hosts_in_group = [self._hosts_available[id] |
| for id in eligible_host_ids_in_group] |
| # So that they show up in a sane order when viewing the job. |
| eligible_hosts_in_group.sort(cmp=scheduler_models.Host.cmp_for_sort) |
| |
| # Limit ourselves to scheduling the atomic group size. |
| if len(eligible_hosts_in_group) > max_hosts: |
| eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts] |
| |
| # Remove the selected hosts from our cached internal state |
| # of available hosts in order to return the Host objects. |
| host_list = [] |
| for host in eligible_hosts_in_group: |
| hosts_in_label.discard(host.id) |
| self._hosts_available.pop(host.id) |
| host_list.append(host) |
| return host_list |
| |
| return [] |
| |
| |
| site_host_scheduler = utils.import_site_class( |
| __file__, 'autotest_lib.scheduler.site_host_scheduler', |
| 'site_host_scheduler', BaseHostScheduler) |
| |
| |
| class HostScheduler(site_host_scheduler): |
| pass |