[autotest] Hand off parsing to lucifer (reland)
BUG=chromium:748234
TEST=None
Change-Id: Iecc7bc8e74ce0f783600bdf7026d89a76ba92fd4
Reviewed-on: https://chromium-review.googlesource.com/757087
Commit-Ready: Allen Li <ayatane@chromium.org>
Tested-by: Allen Li <ayatane@chromium.org>
Reviewed-by: Allen Li <ayatane@chromium.org>
diff --git a/scheduler/luciferlib.py b/scheduler/luciferlib.py
new file mode 100644
index 0000000..74c0210
--- /dev/null
+++ b/scheduler/luciferlib.py
@@ -0,0 +1,217 @@
+# Copyright 2017 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Library providing an API to lucifer."""
+
+import os
+import pipes
+
+import common
+from autotest_lib.client.bin import local_host
+from autotest_lib.client.common_lib import global_config
+from autotest_lib.server.hosts import ssh_host
+
+_config = global_config.global_config
+_SECTION = 'LUCIFER'
+
+# TODO(crbug.com/748234): Move these to shadow_config.ini
+# See also drones.AUTOTEST_INSTALL_DIR
+_AUTOTEST_DIR = '/usr/local/autotest'
+_RUN_JOB_PATH = '/opt/infra-tools/usr/bin/lucifer_run_job'
+_WATCHER_PATH = '/opt/infra-tools/usr/bin/lucifer_watcher'
+_LEASE_DIR = '/var/lib/lucifer/leases'
+
+_JOB_REPORTER_PATH = os.path.join(_AUTOTEST_DIR, 'bin' 'job_reporter')
+
+
+def is_lucifer_enabled():
+ """Return True if lucifer is enabled in the config."""
+ return _config.get_config_value(_SECTION, 'send_jobs_to_lucifer',
+ type=bool)
+
+
+def is_lucifer_owned(job):
+ """Return True if job is already sent to lucifer."""
+ return job.jobhandoff_set.exists()
+
+
+def spawn_job_handler(manager, job, autoserv_exit, pidfile_id=None):
+ """Spawn job_reporter to handle a job.
+
+ Pass all arguments by keyword.
+
+ @param manager: DroneManager instance
+ @param job: Job instance
+ @param autoserv_exit: autoserv exit status
+ @param pidfile_id: PidfileId instance
+ """
+ manager = _DroneManager(manager)
+ if pidfile_id is None:
+ drone = manager.pick_drone_to_use()
+ else:
+ drone = manager.get_drone_for_pidfile(pidfile_id)
+ args = [
+ '--run-job-path', _RUN_JOB_PATH,
+ '--leasedir', _LEASE_DIR,
+ '--job-id', job.id,
+ '--autoserv-exit', autoserv_exit,
+ ]
+ # lucifer_run_job arguments
+ results_dir = _results_dir(manager, job)
+ args.extend([
+ '-resultsdir', results_dir,
+ '-autotestdir', _AUTOTEST_DIR,
+ '-watcherpath', _WATCHER_PATH,
+ ])
+ output_file = os.path.join(results_dir, 'job_reporter_output.log')
+ drone.spawn(_JOB_REPORTER_PATH, args, output_file=output_file)
+
+
+class _DroneManager(object):
+ """Simplified drone API."""
+
+ def __init__(self, old_manager):
+ """Initialize instance.
+
+ @param old_manager: old style DroneManager
+ """
+ self._manager = old_manager
+
+ def get_drone_for_pidfile(self, pidfile_id):
+ """Return a drone to use from a pidfile.
+
+ @param pidfile_id: PidfileId instance.
+ """
+ return _wrap_drone(self._manager.get_drone_for_pidfile_id(pidfile_id))
+
+ def pick_drone_to_use(self, num_processes=1, prefer_ssp=False):
+ """Return a drone to use.
+
+ Various options can be passed to optimize drone selection.
+
+ @param num_processes: number of processes the drone is intended
+ to run
+ @param prefer_ssp: indicates whether drones supporting
+ server-side packaging should be preferred. The returned
+ drone is not guaranteed to support it.
+ """
+ old_drone = self._manager.pick_drone_to_use(
+ num_processes=num_processes,
+ prefer_ssp=prefer_ssp,
+ )
+ return _wrap_drone(old_drone)
+
+ def absolute_path(self, path):
+ """Return absolute path for drone results.
+
+ The returned path might be remote.
+ """
+ return self._manager.absolute_path(path)
+
+
+def _wrap_drone(old_drone):
+ """Wrap an old style drone."""
+ host = old_drone._host
+ if isinstance(host, local_host.LocalHost):
+ return LocalDrone()
+ elif isinstance(host, ssh_host.SSHHost):
+ return RemoteDrone(host)
+ else:
+ raise TypeError('Drone has an unknown host type')
+
+
+def _results_dir(manager, job):
+ """Return results dir for a job.
+
+ Path may be on a remote host.
+ """
+ return manager.absolute_path(_working_directory(job))
+
+
+def _working_directory(job):
+ return _get_consistent_execution_path(job.hostqueueentry_set)
+
+
+def _get_consistent_execution_path(execution_entries):
+ first_execution_path = execution_entries[0].execution_path()
+ for execution_entry in execution_entries[1:]:
+ assert execution_entry.execution_path() == first_execution_path, (
+ '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
+ execution_entry,
+ first_execution_path,
+ execution_entries[0]))
+ return first_execution_path
+
+
+class Drone(object):
+ """Simplified drone API."""
+
+ def spawn(self, path, args, output_file):
+ """Spawn an independent process.
+
+ path must be an absolute path. path may be on a remote machine.
+ args is a list of arguments.
+
+ The process is spawned in its own session. It should not try to
+ obtain a controlling terminal.
+
+ The new process will have stdin opened to /dev/null and stdout,
+ stderr opened to output_file.
+
+ output_file is a pathname, but how it is interpreted is
+ implementation defined, e.g., it may be a remote file.
+ """
+
+
+class LocalDrone(Drone):
+ """Local implementation of Drone."""
+
+ def spawn(self, path, args, output_file):
+ _spawn(path, [path] + args, output_file)
+
+
+class RemoteDrone(Drone):
+ """Remote implementation of Drone through SSH."""
+
+ def __init__(self, host):
+ if not isinstance(host, ssh_host.SSHHost):
+ raise TypeError('RemoteDrone must be passed an SSHHost')
+ self._host = host
+
+ def spawn(self, path, args, output_file):
+ cmd_parts = [path] + args
+ safe_cmd = ' '.join(pipes.quote(part) for part in cmd_parts)
+ safe_file = pipes.quote(output_file)
+ # SSH creates a session for each command, so we do not have to
+ # do it.
+ self._host.run('%(cmd)s <%(null)s >%(file)s 2>&1 &'
+ % {'cmd': safe_cmd,
+ 'file': safe_file,
+ 'null': os.devnull})
+
+
+def _spawn(path, argv, output_file):
+ """Spawn a new process in its own session.
+
+ path must be an absolute path. The first item in argv should be
+ path.
+
+ In the calling process, this function returns on success.
+ The forked process puts itself in its own session and execs.
+
+ The new process will have stdin opened to /dev/null and stdout,
+ stderr opened to output_file.
+ """
+ ppid = os.fork()
+ if not ppid:
+ return
+ os.setsid()
+ null_fd = os.open(os.devnull, os.O_RDONLY)
+ os.dup2(null_fd, 0)
+ os.close(null_fd)
+ out_fd = os.open(output_file, os.O_WRONLY)
+ os.dup2(out_fd, 1)
+ os.dup2(out_fd, 2)
+ os.close(out_fd)
+ os.execv(path, argv)