| # Copyright 2017 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Library providing an API to lucifer.""" |
| |
| import os |
| import logging |
| import pipes |
| import socket |
| import subprocess |
| |
| import common |
| from autotest_lib.client.bin import local_host |
| from autotest_lib.client.common_lib import global_config |
| from autotest_lib.scheduler.drone_manager import PidfileId |
| from autotest_lib.server.hosts import ssh_host |
| from autotest_lib.frontend.afe import models |
| |
| _config = global_config.global_config |
| _SECTION = 'LUCIFER' |
| |
| # TODO(crbug.com/748234): Move these to shadow_config.ini |
| # See also drones.AUTOTEST_INSTALL_DIR |
| _ENV = '/usr/bin/env' |
| _AUTOTEST_DIR = '/usr/local/autotest' |
| _JOB_REPORTER_PATH = os.path.join(_AUTOTEST_DIR, 'bin', 'job_reporter') |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| def is_lucifer_enabled(): |
| """Return True if lucifer is enabled in the config.""" |
| return True |
| |
| |
| def is_enabled_for(level): |
| """Return True if lucifer is enabled for the given level. |
| |
| @param level: string, e.g. 'PARSING', 'GATHERING' |
| """ |
| if not is_lucifer_enabled(): |
| return False |
| config_level = (_config.get_config_value(_SECTION, 'lucifer_level') |
| .upper()) |
| return level.upper() == config_level |
| |
| |
| def is_lucifer_owned(job): |
| """Return True if job is already sent to lucifer. |
| |
| @param job: frontend.afe.models.Job instance |
| """ |
| assert isinstance(job, models.Job) |
| return hasattr(job, 'jobhandoff') |
| |
| |
| def is_lucifer_owned_by_id(job_id): |
| """Return True if job is already sent to lucifer.""" |
| return models.JobHandoff.objects.filter(job_id=job_id).exists() |
| |
| |
| def is_split_job(hqe_id): |
| """Return True if HQE is part of a job with HQEs in a different group. |
| |
| For examples if the given HQE have execution_subdir=foo and the job |
| has an HQE with execution_subdir=bar, then return True. The only |
| situation where this happens is if provisioning in a multi-DUT job |
| fails, the HQEs will each be in their own group. |
| |
| See https://bugs.chromium.org/p/chromium/issues/detail?id=811877 |
| |
| @param hqe_id: HQE id |
| """ |
| hqe = models.HostQueueEntry.objects.get(id=hqe_id) |
| hqes = hqe.job.hostqueueentry_set.all() |
| try: |
| _get_consistent_execution_path(hqes) |
| except ExecutionPathError: |
| return True |
| return False |
| |
| |
| # TODO(crbug.com/748234): This is temporary to enable toggling |
| # lucifer rollouts with an option. |
| def spawn_starting_job_handler(manager, job): |
| """Spawn job_reporter to handle a job. |
| |
| Pass all arguments by keyword. |
| |
| @param manager: scheduler.drone_manager.DroneManager instance |
| @param job: Job instance |
| @returns: Drone instance |
| """ |
| manager = _DroneManager(manager) |
| drone = manager.pick_drone_to_use() |
| results_dir = _results_dir(manager, job) |
| args = [ |
| _JOB_REPORTER_PATH, |
| |
| # General configuration |
| '--jobdir', _get_jobdir(), |
| '--lucifer-path', _get_lucifer_path(), |
| |
| # Job specific |
| '--lucifer-level', 'STARTING', |
| '--job-id', str(job.id), |
| '--results-dir', results_dir, |
| |
| # STARTING specific |
| '--execution-tag', _working_directory(job), |
| ] |
| if _get_gcp_creds(): |
| args = [ |
| 'GOOGLE_APPLICATION_CREDENTIALS=%s' |
| % pipes.quote(_get_gcp_creds()), |
| ] + args |
| drone.spawn(_ENV, args, |
| output_file=_prepare_output_file(drone, results_dir)) |
| drone.add_active_processes(1) |
| manager.reorder_drone_queue() |
| manager.register_pidfile_processes( |
| os.path.join(results_dir, '.autoserv_execute'), 1) |
| return drone |
| |
| |
| # TODO(crbug.com/748234): This is temporary to enable toggling |
| # lucifer rollouts with an option. |
| def spawn_parsing_job_handler(manager, job, autoserv_exit, pidfile_id=None): |
| """Spawn job_reporter to handle a job. |
| |
| Pass all arguments by keyword. |
| |
| @param manager: scheduler.drone_manager.DroneManager instance |
| @param job: Job instance |
| @param autoserv_exit: autoserv exit status |
| @param pidfile_id: PidfileId instance |
| @returns: Drone instance |
| """ |
| manager = _DroneManager(manager) |
| if pidfile_id is None: |
| drone = manager.pick_drone_to_use() |
| else: |
| drone = manager.get_drone_for_pidfile(pidfile_id) |
| results_dir = _results_dir(manager, job) |
| args = [ |
| _JOB_REPORTER_PATH, |
| |
| # General configuration |
| '--jobdir', _get_jobdir(), |
| '--lucifer-path', _get_lucifer_path(), |
| |
| # Job specific |
| '--job-id', str(job.id), |
| '--lucifer-level', 'STARTING', |
| '--parsing-only', |
| '--results-dir', results_dir, |
| ] |
| if _get_gcp_creds(): |
| args = [ |
| 'GOOGLE_APPLICATION_CREDENTIALS=%s' |
| % pipes.quote(_get_gcp_creds()), |
| ] + args |
| drone.spawn(_ENV, args, |
| output_file=_prepare_output_file(drone, results_dir)) |
| drone.add_active_processes(1) |
| manager.reorder_drone_queue() |
| manager.register_pidfile_processes( |
| os.path.join(results_dir, '.autoserv_execute'), 1) |
| return drone |
| |
| |
| _LUCIFER_DIR = 'lucifer' |
| |
| |
| def _prepare_output_file(drone, results_dir): |
| logdir = os.path.join(results_dir, _LUCIFER_DIR) |
| drone.run('mkdir', ['-p', logdir]) |
| return os.path.join(logdir, 'job_reporter_output.log') |
| |
| |
| def _get_jobdir(): |
| return _config.get_config_value(_SECTION, 'jobdir') |
| |
| |
| def _get_lucifer_path(): |
| return os.path.join(_get_binaries_path(), 'lucifer') |
| |
| |
| def _get_binaries_path(): |
| """Get binaries dir path from config..""" |
| return _config.get_config_value(_SECTION, 'binaries_path') |
| |
| |
| def _get_gcp_creds(): |
| """Return path to GCP service account credentials. |
| |
| This is the empty string by default, if no credentials will be used. |
| """ |
| return _config.get_config_value(_SECTION, 'gcp_creds', default='') |
| |
| |
| class _DroneManager(object): |
| """Simplified drone API.""" |
| |
| def __init__(self, old_manager): |
| """Initialize instance. |
| |
| @param old_manager: old style DroneManager |
| """ |
| self._manager = old_manager |
| |
| def get_num_tests_failed(self, pidfile_id): |
| """Return the number of tests failed for autoserv by pidfile. |
| |
| @param pidfile_id: PidfileId instance. |
| @returns: int (-1 if missing) |
| """ |
| state = self._manager.get_pidfile_contents(pidfile_id) |
| if state.num_tests_failed is None: |
| return -1 |
| return state.num_tests_failed |
| |
| def get_drone_for_pidfile(self, pidfile_id): |
| """Return a drone to use from a pidfile. |
| |
| @param pidfile_id: PidfileId instance. |
| """ |
| return _wrap_drone(self._manager.get_drone_for_pidfile_id(pidfile_id)) |
| |
| def pick_drone_to_use(self, num_processes=1): |
| """Return a drone to use. |
| |
| Various options can be passed to optimize drone selection. |
| |
| @param num_processes: number of processes the drone is intended |
| to run |
| """ |
| old_drone = self._manager.pick_drone_to_use( |
| num_processes=num_processes, |
| ) |
| return _wrap_drone(old_drone) |
| |
| def absolute_path(self, path): |
| """Return absolute path for drone results. |
| |
| The returned path might be remote. |
| """ |
| return self._manager.absolute_path(path) |
| |
| def register_pidfile_processes(self, path, count): |
| """Register a pidfile with the given number of processes. |
| |
| This should be done to allow the drone manager to check the |
| number of processes still alive. This may be used to select |
| drones based on the number of active processes as a proxy for |
| load. |
| |
| The exact semantics depends on the drone manager implementation; |
| implementation specific comments follow: |
| |
| Pidfiles are kept in memory to track process count. Pidfiles |
| are rediscovered when the scheduler restarts. Thus, errors in |
| pidfile tracking can be fixed by restarting the scheduler.xo |
| """ |
| pidfile_id = PidfileId(path) |
| self._manager.register_pidfile(pidfile_id) |
| self._manager._registered_pidfile_info[pidfile_id].num_processes = count |
| |
| def reorder_drone_queue(self): |
| """Reorder drone queue according to modified process counts. |
| |
| Call this after Drone.add_active_processes(). |
| """ |
| self._manager.reorder_drone_queue() |
| |
| |
| def _wrap_drone(old_drone): |
| """Wrap an old style drone.""" |
| host = old_drone._host |
| if isinstance(host, local_host.LocalHost): |
| return LocalDrone() |
| elif isinstance(host, ssh_host.SSHHost): |
| return RemoteDrone(old_drone) |
| else: |
| raise TypeError('Drone has an unknown host type') |
| |
| |
| def _results_dir(manager, job): |
| """Return results dir for a job. |
| |
| Path may be on a remote host. |
| """ |
| return manager.absolute_path(_working_directory(job)) |
| |
| |
| def _working_directory(job): |
| return _get_consistent_execution_path(job.hostqueueentry_set.all()) |
| |
| |
| def _get_consistent_execution_path(execution_entries): |
| first_execution_path = execution_entries[0].execution_path() |
| for execution_entry in execution_entries[1:]: |
| if execution_entry.execution_path() != first_execution_path: |
| raise ExecutionPathError( |
| '%s (%s) != %s (%s)' |
| % (execution_entry.execution_path(), |
| execution_entry, |
| first_execution_path, |
| execution_entries[0])) |
| return first_execution_path |
| |
| |
| class ExecutionPathError(Exception): |
| """Raised by _get_consistent_execution_path().""" |
| |
| |
| class Drone(object): |
| """Simplified drone API.""" |
| |
| def hostname(self): |
| """Return the hostname of the drone.""" |
| |
| def run(self, path, args): |
| """Run a command synchronously. |
| |
| path must be an absolute path. path may be on a remote machine. |
| args is a list of arguments. |
| |
| The process may or may not have its own session. The process |
| should be short-lived. It should not try to obtain a |
| controlling terminal. |
| |
| The new process will have stdin, stdout, and stderr opened to |
| /dev/null. |
| |
| This method intentionally has a very restrictive API. It should |
| be used to perform setup local to the drone, when the drone may |
| be a remote machine. |
| """ |
| |
| def spawn(self, path, args, output_file): |
| """Spawn an independent process. |
| |
| path must be an absolute path. path may be on a remote machine. |
| args is a list of arguments. |
| |
| The process is spawned in its own session. It should not try to |
| obtain a controlling terminal. |
| |
| The new process will have stdin opened to /dev/null and stdout, |
| stderr opened to output_file. |
| |
| output_file is a pathname, but how it is interpreted is |
| implementation defined, e.g., it may be a remote file. |
| """ |
| |
| def add_active_processes(self, count): |
| """Track additional number of active processes. |
| |
| This may be used to select drones based on the number of active |
| processes as a proxy for load. |
| |
| _DroneManager.register_pidfile_processes() and |
| _DroneManager.reorder_drone_queue() should also be called. |
| |
| The exact semantics depends on the drone manager implementation; |
| implementation specific comments follow: |
| |
| Process count is used as a proxy for workload, and one process |
| equals the workload of one autoserv or one job. This count is |
| recalculated during each scheduler tick, using pidfiles tracked |
| by the drone manager (so the count added by this function only |
| applies for one tick). |
| """ |
| |
| |
| class LocalDrone(Drone): |
| """Local implementation of Drone.""" |
| |
| def hostname(self): |
| return socket.gethostname() |
| |
| def run(self, path, args): |
| with open(os.devnull, 'r+b') as null: |
| subprocess.call([path] + args, stdin=null, |
| stdout=null, stderr=null) |
| |
| def spawn(self, path, args, output_file): |
| _spawn(path, [path] + args, output_file) |
| |
| |
| class RemoteDrone(Drone): |
| """Remote implementation of Drone through SSH.""" |
| |
| def __init__(self, drone): |
| host = drone._host |
| if not isinstance(host, ssh_host.SSHHost): |
| raise TypeError('RemoteDrone must be passed a drone with SSHHost') |
| self._drone = drone |
| self._host = drone._host |
| |
| def hostname(self): |
| return self._host.hostname |
| |
| def run(self, path, args): |
| cmd_parts = [path] + args |
| safe_cmd = ' '.join(pipes.quote(part) for part in cmd_parts) |
| self._host.run('%(cmd)s <%(null)s >%(null)s 2>&1' |
| % {'cmd': safe_cmd, 'null': os.devnull}) |
| |
| def spawn(self, path, args, output_file): |
| cmd_parts = [path] + args |
| safe_cmd = ' '.join(pipes.quote(part) for part in cmd_parts) |
| safe_file = pipes.quote(output_file) |
| # SSH creates a session for each command, so we do not have to |
| # do it. |
| self._host.run('%(cmd)s <%(null)s >>%(file)s 2>&1 &' |
| % {'cmd': safe_cmd, |
| 'file': safe_file, |
| 'null': os.devnull}) |
| |
| def add_active_processes(self, count): |
| self._drone.active_processes += count |
| |
| |
| def _spawn(path, argv, output_file): |
| """Spawn a new process in its own session. |
| |
| path must be an absolute path. The first item in argv should be |
| path. |
| |
| In the calling process, this function returns on success. |
| The forked process puts itself in its own session and execs. |
| |
| The new process will have stdin opened to /dev/null and stdout, |
| stderr opened to output_file. |
| """ |
| logger.info('Spawning %r, %r, %r', path, argv, output_file) |
| assert all(isinstance(arg, basestring) for arg in argv) |
| pid = os.fork() |
| if pid: |
| os.waitpid(pid, 0) |
| return |
| # Double fork to reparent to init since monitor_db does not reap. |
| if os.fork(): |
| os._exit(os.EX_OK) |
| os.setsid() |
| null_fd = os.open(os.devnull, os.O_RDONLY) |
| os.dup2(null_fd, 0) |
| os.close(null_fd) |
| out_fd = os.open(output_file, os.O_WRONLY | os.O_APPEND | os.O_CREAT) |
| os.dup2(out_fd, 1) |
| os.dup2(out_fd, 2) |
| os.close(out_fd) |
| os.execv(path, argv) |