autotest: Delete drone_utility

no grep hits

Bug: 1033823
Change-Id: Id49e338651c1a0052f47886279b6a9e5b7ed8d93
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/third_party/autotest/+/1966850
Reviewed-by: Xixuan Wu <xixuan@chromium.org>
Tested-by: Allen Li <ayatane@chromium.org>
Commit-Queue: Allen Li <ayatane@chromium.org>
diff --git a/scheduler/drone_utility.py b/scheduler/drone_utility.py
deleted file mode 100755
index b0fe22d..0000000
--- a/scheduler/drone_utility.py
+++ /dev/null
@@ -1,666 +0,0 @@
-#!/usr/bin/python2
-
-"""Utility module that executes management commands on the drone.
-
-1. This is the module responsible for orchestrating processes on a drone.
-2. It receives instructions via stdin and replies via stdout.
-3. Each invocation is responsible for the initiation of a set of batched calls.
-4. The batched calls may be synchronous or asynchronous.
-5. The caller is responsible for monitoring asynchronous calls through pidfiles.
-"""
-
-#pylint: disable-msg=missing-docstring
-
-import argparse
-import collections
-import datetime
-import getpass
-import itertools
-import logging
-import multiprocessing
-import os
-import pickle
-import shutil
-import signal
-import subprocess
-import sys
-import time
-import traceback
-
-import common
-
-from autotest_lib.client.common_lib import error
-from autotest_lib.client.common_lib import global_config
-from autotest_lib.client.common_lib import utils
-from autotest_lib.client.common_lib.cros import retry
-from autotest_lib.scheduler import scheduler_config
-from autotest_lib.server import subcommand
-
-
-# An environment variable we add to the environment to enable us to
-# distinguish processes we started from those that were started by
-# something else during recovery.  Name credit goes to showard. ;)
-DARK_MARK_ENVIRONMENT_VAR = 'AUTOTEST_SCHEDULER_DARK_MARK'
-
-_TEMPORARY_DIRECTORY = 'drone_tmp'
-_TRANSFER_FAILED_FILE = '.transfer_failed'
-
-# script and log file for cleaning up orphaned lxc containers.
-LXC_CLEANUP_SCRIPT = os.path.join(common.autotest_dir, 'site_utils',
-                                  'lxc_cleanup.py')
-LXC_CLEANUP_LOG_FILE = os.path.join(common.autotest_dir, 'logs',
-                                    'lxc_cleanup.log')
-
-
-class _MethodCall(object):
-    def __init__(self, method, args, kwargs):
-        self._method = method
-        self._args = args
-        self._kwargs = kwargs
-
-
-    def execute_on(self, drone_utility):
-        method = getattr(drone_utility, self._method)
-        return method(*self._args, **self._kwargs)
-
-
-    def __str__(self):
-        args = ', '.join(repr(arg) for arg in self._args)
-        kwargs = ', '.join('%s=%r' % (key, value) for key, value in
-                           self._kwargs.iteritems())
-        full_args = ', '.join(item for item in (args, kwargs) if item)
-        return '%s(%s)' % (self._method, full_args)
-
-
-def call(method, *args, **kwargs):
-    return _MethodCall(method, args, kwargs)
-
-
-class DroneUtility(object):
-    """
-    This class executes actual OS calls on the drone machine.
-
-    All paths going into and out of this class are absolute.
-    """
-    _WARNING_DURATION = 400
-
-    def __init__(self):
-        # Tattoo ourselves so that all of our spawn bears our mark.
-        os.putenv(DARK_MARK_ENVIRONMENT_VAR, str(os.getpid()))
-
-        self.warnings = []
-        self._subcommands = []
-
-
-    def initialize(self, results_dir):
-        temporary_directory = os.path.join(results_dir, _TEMPORARY_DIRECTORY)
-        if os.path.exists(temporary_directory):
-            # TODO crbug.com/391111: before we have a better solution to
-            # periodically cleanup tmp files, we have to use rm -rf to delete
-            # the whole folder. shutil.rmtree has performance issue when a
-            # folder has large amount of files, e.g., drone_tmp.
-            os.system('rm -rf %s' % temporary_directory)
-        self._ensure_directory_exists(temporary_directory)
-        # TODO (sbasi) crbug.com/345011 - Remove this configuration variable
-        # and clean up build_externals so it NO-OP's.
-        build_externals = global_config.global_config.get_config_value(
-                scheduler_config.CONFIG_SECTION, 'drone_build_externals',
-                default=True, type=bool)
-        if build_externals:
-            build_extern_cmd = os.path.join(common.autotest_dir,
-                                            'utils', 'build_externals.py')
-            utils.run(build_extern_cmd)
-
-
-    def _warn(self, warning):
-        self.warnings.append(warning)
-
-
-    def refresh(self, pidfile_paths):
-        """Refreshes our view of the processes referred to by pdfile_paths.
-
-        See drone_utility.ProcessRefresher.__call__ for details.
-        """
-        check_mark = global_config.global_config.get_config_value(
-            'SCHEDULER', 'check_processes_for_dark_mark', bool, False)
-        use_pool = global_config.global_config.get_config_value(
-            'SCHEDULER', 'drone_utility_refresh_use_pool', bool, False)
-        result, warnings = ProcessRefresher(check_mark, use_pool)(pidfile_paths)
-        self.warnings += warnings
-        return result
-
-
-    def get_signal_queue_to_kill(self, process):
-        """Get the signal queue needed to kill a process.
-
-        autoserv process has a handle on SIGTERM, in which it can do some
-        cleanup work. However, abort a process with SIGTERM then SIGKILL has
-        its overhead, detailed in following CL:
-        https://chromium-review.googlesource.com/230323
-        This method checks the process's argument and determine if SIGTERM is
-        required, and returns signal queue accordingly.
-
-        @param process: A drone_manager.Process object to be killed.
-
-        @return: The signal queue needed to kill a process.
-
-        """
-        signal_queue_with_sigterm = (signal.SIGTERM, signal.SIGKILL)
-        try:
-            ps_output = subprocess.check_output(
-                    ['/bin/ps', '-p', str(process.pid), '-o', 'args'])
-            # For test running with server-side packaging, SIGTERM needs to be
-            # sent for autoserv process to destroy container used by the test.
-            if '--require-ssp' in ps_output:
-                logging.debug('PID %d requires SIGTERM to abort to cleanup '
-                              'container.', process.pid)
-                return signal_queue_with_sigterm
-        except subprocess.CalledProcessError:
-            # Ignore errors, return the signal queue with SIGTERM to be safe.
-            return signal_queue_with_sigterm
-        # Default to kill the process with SIGKILL directly.
-        return (signal.SIGKILL,)
-
-
-    def kill_processes(self, process_list):
-        """Send signals escalating in severity to the processes in process_list.
-
-        @param process_list: A list of drone_manager.Process objects
-                             representing the processes to kill.
-        """
-        try:
-            logging.info('List of process to be killed: %s', process_list)
-            processes_to_kill = {}
-            for p in process_list:
-                signal_queue = self.get_signal_queue_to_kill(p)
-                processes_to_kill[signal_queue] = (
-                        processes_to_kill.get(signal_queue, []) + [p])
-            sig_counts = {}
-            for signal_queue, processes in processes_to_kill.iteritems():
-                sig_counts.update(utils.nuke_pids(
-                        [-process.pid for process in processes],
-                        signal_queue=signal_queue))
-        except error.AutoservRunError as e:
-            self._warn('Error occured when killing processes. Error: %s' % e)
-
-
-    def _ensure_directory_exists(self, path):
-        if not os.path.exists(path):
-            os.makedirs(path)
-            return
-        if os.path.isdir(path):
-            return
-        assert os.path.isfile(path)
-        if '/hosts/' in path:
-            return
-        raise IOError('Path %s exists as a file, not a directory')
-
-
-    def execute_command(self, command, working_directory, log_file,
-                        pidfile_name):
-        out_file = None
-        if log_file:
-            self._ensure_directory_exists(os.path.dirname(log_file))
-            try:
-                out_file = open(log_file, 'a')
-                separator = ('*' * 80) + '\n'
-                out_file.write('\n' + separator)
-                out_file.write("%s> %s\n" % (time.strftime("%X %x"), command))
-                out_file.write(separator)
-            except (OSError, IOError):
-                pass
-
-        if not out_file:
-            out_file = open('/dev/null', 'w')
-
-        in_devnull = open('/dev/null', 'r')
-
-        self._ensure_directory_exists(working_directory)
-        pidfile_path = os.path.join(working_directory, pidfile_name)
-        if os.path.exists(pidfile_path):
-            self._warn('Pidfile %s already exists' % pidfile_path)
-            os.remove(pidfile_path)
-
-        subprocess.Popen(command, stdout=out_file, stderr=subprocess.STDOUT,
-                         stdin=in_devnull)
-        out_file.close()
-        in_devnull.close()
-
-
-    def write_to_file(self, file_path, contents, is_retry=False):
-        """Write the specified contents to the end of the given file.
-
-        @param file_path: Path to the file.
-        @param contents: Content to be written to the file.
-        @param is_retry: True if this is a retry after file permission be
-                         corrected.
-        """
-        self._ensure_directory_exists(os.path.dirname(file_path))
-        try:
-            file_object = open(file_path, 'a')
-            file_object.write(contents)
-            file_object.close()
-        except IOError as e:
-            # TODO(dshi): crbug.com/459344 Remove following retry when test
-            # container can be unprivileged container.
-            # If write failed with error 'Permission denied', one possible cause
-            # is that the file was created in a container and thus owned by
-            # root. If so, fix the file permission, and try again.
-            if e.errno == 13 and not is_retry:
-                logging.error('Error write to file %s: %s. Will be retried.',
-                              file_path, e)
-                utils.run('sudo chown %s "%s"' % (os.getuid(), file_path))
-                utils.run('sudo chgrp %s "%s"' % (os.getgid(), file_path))
-                self.write_to_file(file_path, contents, is_retry=True)
-            else:
-                self._warn('Error write to file %s: %s' % (file_path, e))
-
-
-    def copy_file_or_directory(self, source_path, destination_path):
-        """
-        This interface is designed to match server.hosts.abstract_ssh.get_file
-        (and send_file).  That is, if the source_path ends with a slash, the
-        contents of the directory are copied; otherwise, the directory iself is
-        copied.
-        """
-        if self._same_file(source_path, destination_path):
-            return
-        self._ensure_directory_exists(os.path.dirname(destination_path))
-        if source_path.endswith('/'):
-            # copying a directory's contents to another directory
-            assert os.path.isdir(source_path)
-            assert os.path.isdir(destination_path)
-            for filename in os.listdir(source_path):
-                self.copy_file_or_directory(
-                    os.path.join(source_path, filename),
-                    os.path.join(destination_path, filename))
-        elif os.path.isdir(source_path):
-            try:
-                shutil.copytree(source_path, destination_path, symlinks=True)
-            except shutil.Error:
-                # Ignore copy directory error due to missing files. The cause
-                # of this behavior is that, gs_offloader zips up folders with
-                # too many files. There is a race condition that repair job
-                # tries to copy provision job results to the test job result
-                # folder, meanwhile gs_offloader is uploading the provision job
-                # result and zipping up folders which contains too many files.
-                pass
-        elif os.path.islink(source_path):
-            # copied from shutil.copytree()
-            link_to = os.readlink(source_path)
-            os.symlink(link_to, destination_path)
-        else:
-            try:
-                shutil.copy(source_path, destination_path)
-            except IOError:
-                # Ignore copy error following the same above reason.
-                pass
-
-
-    def _same_file(self, source_path, destination_path):
-        """Checks if the source and destination are the same
-
-        Returns True if the destination is the same as the source, False
-        otherwise. Also returns False if the destination does not exist.
-        """
-        if not os.path.exists(destination_path):
-            return False
-        return os.path.samefile(source_path, destination_path)
-
-
-    def cleanup_orphaned_containers(self):
-        """Run lxc_cleanup script to clean up orphaned container.
-        """
-        # The script needs to run with sudo as the containers are privileged.
-        # TODO(dshi): crbug.com/459344 Call lxc_cleanup.main when test
-        # container can be unprivileged container.
-        command = ['sudo', LXC_CLEANUP_SCRIPT, '-x', '-v', '-l',
-                   LXC_CLEANUP_LOG_FILE]
-        logging.info('Running %s', command)
-        # stdout and stderr needs to be direct to /dev/null, otherwise existing
-        # of drone_utils process will kill lxc_cleanup script.
-        subprocess.Popen(
-                command, shell=False, stdin=None, stdout=open('/dev/null', 'w'),
-                stderr=open('/dev/null', 'a'), preexec_fn=os.setpgrp)
-
-
-    def wait_for_all_async_commands(self):
-        for subproc in self._subcommands:
-            subproc.fork_waitfor()
-        self._subcommands = []
-
-
-    def _poll_async_commands(self):
-        still_running = []
-        for subproc in self._subcommands:
-            if subproc.poll() is None:
-                still_running.append(subproc)
-        self._subcommands = still_running
-
-
-    def _wait_for_some_async_commands(self):
-        self._poll_async_commands()
-        max_processes = scheduler_config.config.max_transfer_processes
-        while len(self._subcommands) >= max_processes:
-            time.sleep(1)
-            self._poll_async_commands()
-
-
-    def run_async_command(self, function, args):
-        subproc = subcommand.subcommand(function, args)
-        self._subcommands.append(subproc)
-        subproc.fork_start()
-
-
-    def _sync_get_file_from(self, hostname, source_path, destination_path):
-        logging.debug('_sync_get_file_from hostname: %s, source_path: %s,'
-                      'destination_path: %s', hostname, source_path,
-                      destination_path)
-        self._ensure_directory_exists(os.path.dirname(destination_path))
-        host = create_host(hostname)
-        host.get_file(source_path, destination_path, delete_dest=True)
-
-
-    def get_file_from(self, hostname, source_path, destination_path):
-        self.run_async_command(self._sync_get_file_from,
-                               (hostname, source_path, destination_path))
-
-
-    def _sync_send_file_to(self, hostname, source_path, destination_path,
-                           can_fail):
-        logging.debug('_sync_send_file_to. hostname: %s, source_path: %s, '
-                      'destination_path: %s, can_fail:%s', hostname,
-                      source_path, destination_path, can_fail)
-        host = create_host(hostname)
-        try:
-            host.run('mkdir -p ' + os.path.dirname(destination_path))
-            host.send_file(source_path, destination_path, delete_dest=True)
-        except error.AutoservError:
-            if not can_fail:
-                raise
-
-            if os.path.isdir(source_path):
-                failed_file = os.path.join(source_path, _TRANSFER_FAILED_FILE)
-                file_object = open(failed_file, 'w')
-                try:
-                    file_object.write('%s:%s\n%s\n%s' %
-                                      (hostname, destination_path,
-                                       datetime.datetime.now(),
-                                       traceback.format_exc()))
-                finally:
-                    file_object.close()
-            else:
-                copy_to = destination_path + _TRANSFER_FAILED_FILE
-                self._ensure_directory_exists(os.path.dirname(copy_to))
-                self.copy_file_or_directory(source_path, copy_to)
-
-
-    def send_file_to(self, hostname, source_path, destination_path,
-                     can_fail=False):
-        self.run_async_command(self._sync_send_file_to,
-                               (hostname, source_path, destination_path,
-                                can_fail))
-
-
-    def _report_long_execution(self, calls, duration):
-        call_count = {}
-        for call in calls:
-            call_count.setdefault(call._method, 0)
-            call_count[call._method] += 1
-        call_summary = '\n'.join('%d %s' % (count, method)
-                                 for method, count in call_count.iteritems())
-        self._warn('Execution took %f sec\n%s' % (duration, call_summary))
-
-
-    def execute_calls(self, calls):
-        results = []
-        start_time = time.time()
-        max_processes = scheduler_config.config.max_transfer_processes
-        for method_call in calls:
-            results.append(method_call.execute_on(self))
-            if len(self._subcommands) >= max_processes:
-                self._wait_for_some_async_commands()
-        self.wait_for_all_async_commands()
-
-        duration = time.time() - start_time
-        if duration > self._WARNING_DURATION:
-            self._report_long_execution(calls, duration)
-
-        warnings = self.warnings
-        self.warnings = []
-        return dict(results=results, warnings=warnings)
-
-
-_MAX_REFRESH_POOL_SIZE = 50
-
-class ProcessRefresher(object):
-    """Object to refresh process information from give pidfiles.
-
-    Usage: ProcessRefresh(True)(pidfile_list)
-    """
-
-    def __init__(self, check_mark, use_pool=False):
-        """
-        @param check_mark: If True, only consider processes that were
-                explicitly marked by a former drone_utility call as autotest
-                related processes.
-        @param use_pool: If True, use a multiprocessing.Pool to parallelize
-                costly operations.
-        """
-        self._check_mark = check_mark
-        self._use_pool = use_pool
-        self._pool = None
-
-
-    def __call__(self, pidfile_paths):
-        """
-        @param pidfile_paths: A list of paths to check for pidfiles.
-
-        @returns (result, warnings)
-            where result is a dict with the following keys:
-            - pidfiles: dict mapping pidfile paths to file contents, for
-              pidfiles that exist.
-            - all_processes: list of dicts corresponding to all running
-              processes. Each dict contain pid, pgid, ppid, comm, and args (see
-              "man ps" for details).
-            - autoserv_processes: likewise, restricted to autoserv processes.
-            - parse_processes: likewise, restricted to parse processes.
-            - pidfiles_second_read: same info as pidfiles, but gathered after
-              the processes are scanned.
-            and warnings is a list of warnings genearted during process refresh.
-        """
-
-        if self._use_pool:
-            pool_size = max(
-                    min(len(pidfile_paths), _MAX_REFRESH_POOL_SIZE),
-                    1)
-            self._pool = multiprocessing.Pool(pool_size)
-        else:
-            pool_size = 0
-        logging.info('Refreshing %d pidfiles with %d helper processes',
-                     len(pidfile_paths), pool_size)
-
-        warnings = []
-        # It is necessary to explicitly force this to be a list because results
-        # are pickled by DroneUtility.
-        proc_infos = list(_get_process_info())
-
-        autoserv_processes, extra_warnings = self._filter_proc_infos(
-                proc_infos, 'autoserv')
-        warnings += extra_warnings
-        parse_processes, extra_warnings = self._filter_proc_infos(proc_infos,
-                                                                  'parse')
-        warnings += extra_warnings
-        site_parse_processes, extra_warnings = self._filter_proc_infos(
-                proc_infos, 'site_parse')
-        warnings += extra_warnings
-
-        result = {
-                'pidfiles': self._read_pidfiles(pidfile_paths),
-                'all_processes': proc_infos,
-                'autoserv_processes': autoserv_processes,
-                'parse_processes': (parse_processes + site_parse_processes),
-                'pidfiles_second_read': self._read_pidfiles(pidfile_paths),
-        }
-        return result, warnings
-
-
-    def _read_pidfiles(self, pidfile_paths):
-        """Uses a process pool to read requested pidfile_paths."""
-        if self._use_pool:
-            contents = self._pool.map(_read_pidfile, pidfile_paths)
-            contents = [c for c in contents if c is not None]
-            return {k: v for k, v in contents}
-        else:
-            pidfiles = {}
-            for path in pidfile_paths:
-                content = _read_pidfile(path)
-                if content is None:
-                    continue
-                pidfiles[content.path] = content.content
-            return pidfiles
-
-
-    def _filter_proc_infos(self, proc_infos, command_name):
-        """Filters process info for the given command_name.
-
-        Examines ps output as returned by get_process_info and return
-        the process dicts for processes matching the given command name.
-
-        @proc_infos: ps output as returned by _get_process_info.
-        @param command_name: The name of the command, eg 'autoserv'.
-
-        @return: (proc_infos, warnings) where proc_infos is a list of ProcInfo
-                as returned by _get_process_info and warnings is a list of
-                warnings generated while filtering.
-                """
-        proc_infos = [info for info in proc_infos
-                      if info['comm'] == command_name]
-        if not self._check_mark:
-            return proc_infos, []
-
-        if self._use_pool:
-            dark_marks = self._pool.map(
-                    _process_has_dark_mark,
-                    [info['pid'] for info in proc_infos]
-            )
-        else:
-            dark_marks = [_process_has_dark_mark(info['pid'])
-                          for info in proc_infos]
-
-        marked_proc_infos = []
-        warnings = []
-        for marked, info in itertools.izip(dark_marks, proc_infos):
-            if marked:
-                marked_proc_infos.append(info)
-            else:
-                warnings.append(
-                        '%(comm)s process pid %(pid)s has no dark mark; '
-                        'ignoring.' % info)
-        return marked_proc_infos, warnings
-
-
-def create_host(hostname):
-    # TODO(crbug.com/739466) Delay import to avoid a ~0.7 second penalty
-    # drone_utility calls that don't actually interact with DUTs.
-    from autotest_lib.server import hosts
-    username = global_config.global_config.get_config_value(
-        'SCHEDULER', hostname + '_username', default=getpass.getuser())
-    return hosts.SSHHost(hostname, user=username)
-
-
-def parse_input():
-    input_chunks = []
-    chunk_of_input = sys.stdin.read()
-    while chunk_of_input:
-        input_chunks.append(chunk_of_input)
-        chunk_of_input = sys.stdin.read()
-    pickled_input = ''.join(input_chunks)
-
-    try:
-        return pickle.loads(pickled_input)
-    except Exception:
-        separator = '*' * 50
-        raise ValueError('Unpickling input failed\n'
-                         'Input: %r\n'
-                         'Exception from pickle:\n'
-                         '%s\n%s\n%s' %
-                         (pickled_input, separator, traceback.format_exc(),
-                          separator))
-
-
-def _parse_args(args):
-    parser = argparse.ArgumentParser(description='Local drone process manager.')
-    parser.add_argument('--call_time',
-                        help='Time this process was invoked from the master',
-                        default=None, type=float)
-    return parser.parse_args(args)
-
-
-def return_data(data):
-    print pickle.dumps(data)
-
-def _process_has_dark_mark(pid):
-    """Checks if a process was launched earlier by drone_utility.
-
-    @param pid: The pid of the process to check.
-    """
-    try:
-        with open('/proc/%s/environ' % pid, 'rb') as env_file:
-            env_data = env_file.read()
-    except EnvironmentError:
-        return False
-    return DARK_MARK_ENVIRONMENT_VAR in env_data
-
-
-_PS_ARGS = ('pid', 'pgid', 'ppid', 'comm', 'args')
-def _get_process_info():
-    """Parse ps output for all process information.
-
-    @returns A generator of dicts. Each dict has the following keys:
-        - comm: command_name,
-        - pgid: process group id,
-        - ppid: parent process id,
-        - pid: process id,
-        - args: args the command was invoked with,
-    """
-    @retry.retry(subprocess.CalledProcessError,
-                    timeout_min=0.5, delay_sec=0.25)
-    def run_ps():
-        return subprocess.check_output(
-                ['/bin/ps', '--no-header', 'x', '-o', ','.join(_PS_ARGS)])
-
-    ps_output = run_ps()
-    # split each line into the columns output by ps
-    split_lines = [line.split(None, 4) for line in ps_output.splitlines()]
-    return (dict(itertools.izip(_PS_ARGS, line_components))
-            for line_components in split_lines)
-
-
-_PidfileContent = collections.namedtuple('_PidfileContent', ['path', 'content'])
-def _read_pidfile(pidfile_path):
-    """Reads the content of the given pidfile if it exists
-
-    @param: pidfile_path: Path of the file to read.
-    @returns: _PidfileContent tuple on success, None otherwise.
-    """
-    if not os.path.exists(pidfile_path):
-        return None
-    try:
-        with open(pidfile_path, 'r') as file_object:
-            return _PidfileContent(pidfile_path, file_object.read())
-    except IOError:
-        return None
-
-
-def main():
-    calls = parse_input()
-    args = _parse_args(sys.argv[1:])
-
-    drone_utility = DroneUtility()
-    return_value = drone_utility.execute_calls(calls)
-    return_data(return_value)
-
-
-if __name__ == '__main__':
-    main()