[autotest] Hand off parsing to lucifer (reland)

BUG=chromium:748234
TEST=None

Change-Id: Iecc7bc8e74ce0f783600bdf7026d89a76ba92fd4
Reviewed-on: https://chromium-review.googlesource.com/757087
Commit-Ready: Allen Li <ayatane@chromium.org>
Tested-by: Allen Li <ayatane@chromium.org>
Reviewed-by: Allen Li <ayatane@chromium.org>
diff --git a/global_config.ini b/global_config.ini
index 63b57bb..5fca9a4 100644
--- a/global_config.ini
+++ b/global_config.ini
@@ -462,3 +462,6 @@
 stable_version_dragonboard: git_mnc-brillo-dev/dragonboard-userdebug/2512766
 stable_version_edison: git_nyc-jaqen-dev/jaqen_edison-userdebug/2979181
 package_url_pattern: %s/static/%s
+
+[LUCIFER]
+send_jobs_to_lucifer: False
diff --git a/scheduler/drone_manager.py b/scheduler/drone_manager.py
index 3995484..6c30249 100644
--- a/scheduler/drone_manager.py
+++ b/scheduler/drone_manager.py
@@ -353,6 +353,14 @@
         return self._get_drone_for_process(pidfile_contents.process)
 
 
+    def get_drone_for_pidfile_id(self, pidfile_id):
+        """Public API for luciferlib.
+
+        @param pidfile_id: PidfileId instance.
+        """
+        return self._get_drone_for_pidfile_id(pidfile_id)
+
+
     def _drop_old_pidfiles(self):
         # use items() since the dict is modified in unregister_pidfile()
         for pidfile_id, info in self._registered_pidfile_info.items():
@@ -628,6 +636,30 @@
         return min(drones, key=lambda d: d.used_capacity())
 
 
+    def pick_drone_to_use(self, num_processes=1, prefer_ssp=False):
+        """Return a drone to use.
+
+        Various options can be passed to optimize drone selection.
+
+        num_processes is the number of processes the drone is intended
+        to run.
+
+        prefer_ssp indicates whether drones supporting server-side
+        packaging should be preferred.  The returned drone is not
+        guaranteed to support it.
+
+        This public API is exposed for luciferlib to wrap.
+
+        Returns a drone instance (see drones.py).
+        """
+        return self._choose_drone_for_execution(
+                num_processes=num_processes,
+                username=None,  # Always allow all drones
+                drone_hostnames_allowed=None,  # Always allow all drones
+                require_ssp=prefer_ssp,
+        )
+
+
     def _choose_drone_for_execution(self, num_processes, username,
                                     drone_hostnames_allowed,
                                     require_ssp=False):
diff --git a/scheduler/luciferlib.py b/scheduler/luciferlib.py
new file mode 100644
index 0000000..74c0210
--- /dev/null
+++ b/scheduler/luciferlib.py
@@ -0,0 +1,217 @@
+# Copyright 2017 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Library providing an API to lucifer."""
+
+import os
+import pipes
+
+import common
+from autotest_lib.client.bin import local_host
+from autotest_lib.client.common_lib import global_config
+from autotest_lib.server.hosts import ssh_host
+
+_config = global_config.global_config
+_SECTION = 'LUCIFER'
+
+# TODO(crbug.com/748234): Move these to shadow_config.ini
+# See also drones.AUTOTEST_INSTALL_DIR
+_AUTOTEST_DIR = '/usr/local/autotest'
+_RUN_JOB_PATH = '/opt/infra-tools/usr/bin/lucifer_run_job'
+_WATCHER_PATH = '/opt/infra-tools/usr/bin/lucifer_watcher'
+_LEASE_DIR = '/var/lib/lucifer/leases'
+
+_JOB_REPORTER_PATH = os.path.join(_AUTOTEST_DIR, 'bin' 'job_reporter')
+
+
+def is_lucifer_enabled():
+    """Return True if lucifer is enabled in the config."""
+    return _config.get_config_value(_SECTION, 'send_jobs_to_lucifer',
+                                    type=bool)
+
+
+def is_lucifer_owned(job):
+    """Return True if job is already sent to lucifer."""
+    return job.jobhandoff_set.exists()
+
+
+def spawn_job_handler(manager, job, autoserv_exit, pidfile_id=None):
+    """Spawn job_reporter to handle a job.
+
+    Pass all arguments by keyword.
+
+    @param manager: DroneManager instance
+    @param job: Job instance
+    @param autoserv_exit: autoserv exit status
+    @param pidfile_id: PidfileId instance
+    """
+    manager = _DroneManager(manager)
+    if pidfile_id is None:
+        drone = manager.pick_drone_to_use()
+    else:
+        drone = manager.get_drone_for_pidfile(pidfile_id)
+    args = [
+            '--run-job-path', _RUN_JOB_PATH,
+            '--leasedir', _LEASE_DIR,
+            '--job-id', job.id,
+            '--autoserv-exit', autoserv_exit,
+    ]
+    # lucifer_run_job arguments
+    results_dir = _results_dir(manager, job)
+    args.extend([
+            '-resultsdir', results_dir,
+            '-autotestdir', _AUTOTEST_DIR,
+            '-watcherpath', _WATCHER_PATH,
+    ])
+    output_file = os.path.join(results_dir, 'job_reporter_output.log')
+    drone.spawn(_JOB_REPORTER_PATH, args, output_file=output_file)
+
+
+class _DroneManager(object):
+    """Simplified drone API."""
+
+    def __init__(self, old_manager):
+        """Initialize instance.
+
+        @param old_manager: old style DroneManager
+        """
+        self._manager = old_manager
+
+    def get_drone_for_pidfile(self, pidfile_id):
+        """Return a drone to use from a pidfile.
+
+        @param pidfile_id: PidfileId instance.
+        """
+        return _wrap_drone(self._manager.get_drone_for_pidfile_id(pidfile_id))
+
+    def pick_drone_to_use(self, num_processes=1, prefer_ssp=False):
+        """Return a drone to use.
+
+        Various options can be passed to optimize drone selection.
+
+        @param num_processes: number of processes the drone is intended
+            to run
+        @param prefer_ssp: indicates whether drones supporting
+            server-side packaging should be preferred.  The returned
+            drone is not guaranteed to support it.
+        """
+        old_drone = self._manager.pick_drone_to_use(
+                num_processes=num_processes,
+                prefer_ssp=prefer_ssp,
+        )
+        return _wrap_drone(old_drone)
+
+    def absolute_path(self, path):
+        """Return absolute path for drone results.
+
+        The returned path might be remote.
+        """
+        return self._manager.absolute_path(path)
+
+
+def _wrap_drone(old_drone):
+    """Wrap an old style drone."""
+    host = old_drone._host
+    if isinstance(host, local_host.LocalHost):
+        return LocalDrone()
+    elif isinstance(host, ssh_host.SSHHost):
+        return RemoteDrone(host)
+    else:
+        raise TypeError('Drone has an unknown host type')
+
+
+def _results_dir(manager, job):
+    """Return results dir for a job.
+
+    Path may be on a remote host.
+    """
+    return manager.absolute_path(_working_directory(job))
+
+
+def _working_directory(job):
+    return _get_consistent_execution_path(job.hostqueueentry_set)
+
+
+def _get_consistent_execution_path(execution_entries):
+    first_execution_path = execution_entries[0].execution_path()
+    for execution_entry in execution_entries[1:]:
+        assert execution_entry.execution_path() == first_execution_path, (
+            '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
+                                    execution_entry,
+                                    first_execution_path,
+                                    execution_entries[0]))
+    return first_execution_path
+
+
+class Drone(object):
+    """Simplified drone API."""
+
+    def spawn(self, path, args, output_file):
+        """Spawn an independent process.
+
+        path must be an absolute path.  path may be on a remote machine.
+        args is a list of arguments.
+
+        The process is spawned in its own session.  It should not try to
+        obtain a controlling terminal.
+
+        The new process will have stdin opened to /dev/null and stdout,
+        stderr opened to output_file.
+
+        output_file is a pathname, but how it is interpreted is
+        implementation defined, e.g., it may be a remote file.
+        """
+
+
+class LocalDrone(Drone):
+    """Local implementation of Drone."""
+
+    def spawn(self, path, args, output_file):
+        _spawn(path, [path] + args, output_file)
+
+
+class RemoteDrone(Drone):
+    """Remote implementation of Drone through SSH."""
+
+    def __init__(self, host):
+        if not isinstance(host, ssh_host.SSHHost):
+            raise TypeError('RemoteDrone must be passed an SSHHost')
+        self._host = host
+
+    def spawn(self, path, args, output_file):
+        cmd_parts = [path] + args
+        safe_cmd = ' '.join(pipes.quote(part) for part in cmd_parts)
+        safe_file = pipes.quote(output_file)
+        # SSH creates a session for each command, so we do not have to
+        # do it.
+        self._host.run('%(cmd)s <%(null)s >%(file)s 2>&1 &'
+                       % {'cmd': safe_cmd,
+                          'file': safe_file,
+                          'null': os.devnull})
+
+
+def _spawn(path, argv, output_file):
+    """Spawn a new process in its own session.
+
+    path must be an absolute path.  The first item in argv should be
+    path.
+
+    In the calling process, this function returns on success.
+    The forked process puts itself in its own session and execs.
+
+    The new process will have stdin opened to /dev/null and stdout,
+    stderr opened to output_file.
+    """
+    ppid = os.fork()
+    if not ppid:
+        return
+    os.setsid()
+    null_fd = os.open(os.devnull, os.O_RDONLY)
+    os.dup2(null_fd, 0)
+    os.close(null_fd)
+    out_fd = os.open(output_file, os.O_WRONLY)
+    os.dup2(out_fd, 1)
+    os.dup2(out_fd, 2)
+    os.close(out_fd)
+    os.execv(path, argv)
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index d6cf433..d3b95f9 100755
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -27,6 +27,7 @@
 from autotest_lib.frontend.afe import models
 from autotest_lib.scheduler import agent_task, drone_manager
 from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
+from autotest_lib.scheduler import luciferlib
 from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
 from autotest_lib.scheduler import postjob_task
 from autotest_lib.scheduler import query_managers
@@ -345,6 +346,9 @@
             with breakdown_timer.Step('trigger_refresh'):
                 self._log_tick_msg('Starting _drone_manager.trigger_refresh')
                 _drone_manager.trigger_refresh()
+            if luciferlib.is_lucifer_enabled():
+                with breakdown_timer.Step('send_to_lucifer'):
+                    self._send_to_lucifer()
             with breakdown_timer.Step('schedule_running_host_queue_entries'):
                 self._schedule_running_host_queue_entries()
             with breakdown_timer.Step('schedule_special_tasks'):
@@ -514,12 +518,18 @@
 
         @return: A list of AgentTasks.
         """
-        # host queue entry statuses handled directly by AgentTasks (Verifying is
-        # handled through SpecialTasks, so is not listed here)
-        statuses = (models.HostQueueEntry.Status.STARTING,
-                    models.HostQueueEntry.Status.RUNNING,
-                    models.HostQueueEntry.Status.GATHERING,
-                    models.HostQueueEntry.Status.PARSING)
+        if luciferlib.is_lucifer_enabled():
+            statuses = (models.HostQueueEntry.Status.STARTING,
+                        models.HostQueueEntry.Status.RUNNING,
+                        models.HostQueueEntry.Status.GATHERING)
+        else:
+            # host queue entry statuses handled directly by AgentTasks
+            # (Verifying is handled through SpecialTasks, so is not
+            # listed here)
+            statuses = (models.HostQueueEntry.Status.STARTING,
+                        models.HostQueueEntry.Status.RUNNING,
+                        models.HostQueueEntry.Status.GATHERING,
+                        models.HostQueueEntry.Status.PARSING)
         status_list = ','.join("'%s'" % status for status in statuses)
         queue_entries = scheduler_models.HostQueueEntry.fetch(
                 where='status IN (%s)' % status_list)
@@ -899,6 +909,35 @@
 
 
     @_calls_log_tick_msg
+    def _send_to_lucifer(self):
+        """
+        Hand off ownership of a job to lucifer component.
+        """
+        Status = models.HostQueueEntry.Status
+        queue_entries_qs = (scheduler_models.HostQueueEntry.objects
+                            .filter(status=Status.PARSING))
+        for queue_entry in queue_entries_qs:
+            # If this HQE already has an agent, let monitor_db continue
+            # owning it.
+            if self.get_agents_for_entry(queue_entry):
+                continue
+
+            job = queue_entry.job
+            if luciferlib.is_lucifer_owned(job):
+                continue
+            task = postjob_task.PostJobTask(
+                    [queue_entry], log_file_name='/dev/null')
+            pidfile_id = task._autoserv_monitor.pidfile_id
+            autoserv_exit = task._autoserv_monitor.exit_code()
+            luciferlib.spawn_job_handler(
+                    manager=_drone_manager,
+                    job=job,
+                    autoserv_exit=autoserv_exit,
+                    pidfile_id=pidfile_id)
+            models.JobHandoff.objects.create(job=job)
+
+
+    @_calls_log_tick_msg
     def _schedule_running_host_queue_entries(self):
         """
         Adds agents to the dispatcher.