Attached is a very large patch that adds support for running a
distributed Autotest service.  Previously, the scheduler could only
execute autoservs locally and all results were written directly to the
local filesystem.  This placed a limit on the number of machines that
could be concurrently tested by a single Autotest service instance due
to the strain of running many autoserv processes on a single machine.

With this change, the scheduler can spread autoserv processes among a
number of machines and gather all results to a single results
repository machine.  This allows vastly improved scalability for a
single Autotest service instance.  See
http://autotest.kernel.org/wiki/DistributedServerSetup for more
details.

Note that the single-server setup is still supported and the global
configuration defaults to this setup, so existing service instances
should continue to run.

Steve

Signed-off-by: Steve Howard <showard@google.com>




git-svn-id: http://test.kernel.org/svn/autotest/trunk@2596 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/apache/drone-conf b/apache/drone-conf
new file mode 100644
index 0000000..4702d18
--- /dev/null
+++ b/apache/drone-conf
@@ -0,0 +1,18 @@
+NameVirtualHost *
+<VirtualHost *>
+    ErrorLog /var/log/apache2/error.log
+    LogLevel warn
+
+    CustomLog /var/log/apache2/access.log combined
+    ServerSignature On
+
+    Alias /results "/usr/local/autotest/results/"
+    <Directory /usr/local/autotest/results/>
+    Options Indexes FollowSymLinks MultiViews
+    AllowOverride None
+    Order allow,deny
+    Allow from all
+    </Directory>
+
+
+</VirtualHost>
diff --git a/client/common_lib/test_utils/mock.py b/client/common_lib/test_utils/mock.py
index b734e83..0223d82 100644
--- a/client/common_lib/test_utils/mock.py
+++ b/client/common_lib/test_utils/mock.py
@@ -80,6 +80,8 @@
 
 
     def __str__(self):
+        if isinstance(self.value, argument_comparator):
+            return str(self.value)
         return repr(self.value)
 
 
@@ -118,6 +120,15 @@
         return "is a %s" % self.cls
 
 
+class anything_comparator(argument_comparator):
+    def is_satisfied_by(self, parameter):
+        return True
+
+
+    def __str__(self):
+        return 'anything'
+
+
 class base_mapping(object):
     def __init__(self, symbol, return_obj, *args, **dargs):
         self.return_obj = return_obj
@@ -421,14 +432,14 @@
         if len(self.recording) != 0:
             func_call = self.recording[0]
             if func_call.symbol != symbol:
-                msg = ("Unexpected call: %s. Expected %s"
+                msg = ("Unexpected call: %s\nExpected: %s"
                     % (_dump_function_call(symbol, args, dargs),
                        func_call))
                 self._append_error(msg)
                 return None
 
             if not func_call.match(*args, **dargs):
-                msg = ("%s called. Expected %s"
+                msg = ("Incorrect call: %s\nExpected: %s"
                     % (_dump_function_call(symbol, args, dargs),
                       func_call))
                 self._append_error(msg)
diff --git a/client/common_lib/utils.py b/client/common_lib/utils.py
index 77d7e81..cde942f 100644
--- a/client/common_lib/utils.py
+++ b/client/common_lib/utils.py
@@ -21,7 +21,8 @@
 
 
 class BgJob(object):
-    def __init__(self, command, stdout_tee=None, stderr_tee=None, verbose=True):
+    def __init__(self, command, stdout_tee=None, stderr_tee=None, verbose=True,
+                 stdin=None):
         self.command = command
         self.stdout_tee = stdout_tee
         self.stderr_tee = stderr_tee
@@ -32,7 +33,8 @@
         self.sp = subprocess.Popen(command, stdout=subprocess.PIPE,
                                    stderr=subprocess.PIPE,
                                    preexec_fn=self._reset_sigpipe, shell=True,
-                                   executable="/bin/bash")
+                                   executable="/bin/bash",
+                                   stdin=stdin)
 
 
     def output_prepare(self, stdout_file=None, stderr_file=None):
@@ -300,7 +302,7 @@
 
 
 def run(command, timeout=None, ignore_status=False,
-        stdout_tee=None, stderr_tee=None, verbose=True):
+        stdout_tee=None, stderr_tee=None, verbose=True, stdin=None):
     """
     Run a command on the host.
 
@@ -316,6 +318,7 @@
                         will be written as it is generated (data will still
                         be stored in result.stdout)
             stderr_tee: likewise for stderr
+            stdin: stdin to pass to the executed process
 
     Returns:
             a CmdResult object
@@ -324,8 +327,9 @@
             CmdError: the exit code of the command
                     execution was not 0
     """
-    bg_job = join_bg_jobs((BgJob(command, stdout_tee, stderr_tee, verbose),),
-                          timeout)[0]
+    bg_job = join_bg_jobs(
+        (BgJob(command, stdout_tee, stderr_tee, verbose, stdin=stdin),),
+        timeout)[0]
     if not ignore_status and bg_job.result.exit_status:
         raise error.CmdError(command, bg_job.result,
                              "Command returned non-zero exit status")
diff --git a/global_config.ini b/global_config.ini
index 7e70cba..0e023c9 100644
--- a/global_config.ini
+++ b/global_config.ini
@@ -32,9 +32,13 @@
 max_running_jobs: 1000
 max_jobs_started_per_cycle: 100
 max_parse_processes: 5
+max_transfer_processes: 50
 tick_pause_sec: 5
 clean_interval_minutes: 60
 synch_job_start_timeout_minutes: 150
+drones: localhost
+drone_installation_directory: /usr/local/autotest
+results_host: localhost
 
 [HOSTS]
 wait_up_processes:
diff --git a/scheduler/drone_manager.py b/scheduler/drone_manager.py
new file mode 100644
index 0000000..826a62c
--- /dev/null
+++ b/scheduler/drone_manager.py
@@ -0,0 +1,480 @@
+import os, re, shutil, signal, subprocess, errno, time, heapq, traceback
+import common
+from autotest_lib.client.common_lib import error
+from autotest_lib.scheduler import email_manager, drone_utility, drones
+
+_AUTOSERV_PID_FILE = '.autoserv_execute'
+
+
+class DroneManagerError(Exception):
+    pass
+
+
+class CustomEquals(object):
+    def _id(self):
+        raise NotImplementedError
+
+
+    def __eq__(self, other):
+        if not isinstance(other, type(self)):
+            return NotImplemented
+        return self._id() == other._id()
+
+
+    def __ne__(self, other):
+        return not self == other
+
+
+    def __hash__(self):
+        return hash(self._id())
+
+
+class Process(CustomEquals):
+    def __init__(self, hostname, pid, ppid=None):
+        self.hostname = hostname
+        self.pid = pid
+        self.ppid = ppid
+
+    def _id(self):
+        return (self.hostname, self.pid)
+
+
+    def __str__(self):
+        return '%s/%s' % (self.hostname, self.pid)
+
+
+    def __repr__(self):
+        return super(Process, self).__repr__() + '<%s>' % self
+
+
+class PidfileId(CustomEquals):
+    def __init__(self, path):
+        self.path = path
+
+
+    def _id(self):
+        return self.path
+
+
+    def __str__(self):
+        return str(self.path)
+
+
+class PidfileContents(object):
+    process = None
+    exit_status = None
+    num_tests_failed = None
+
+    def is_invalid(self):
+        return False
+
+
+class InvalidPidfile(object):
+    def __init__(self, error):
+        self.error = error
+
+
+    def is_invalid(self):
+        return True
+
+
+    def __str__(self):
+        return self.error
+
+
+class DroneManager(object):
+    """
+    This class acts as an interface from the scheduler to drones, whether it be
+    only a single "drone" for localhost or multiple remote drones.
+
+    All paths going into and out of this class are relative to the full results
+    directory, except for those returns by absolute_path().
+    """
+    _MAX_PIDFILE_AGE = 1000
+    _NULL_HOSTNAME = 'null host'
+
+    def __init__(self):
+        self._results_dir = None
+        self._processes = {}
+        self._process_set = set()
+        self._pidfiles = {}
+        self._pidfiles_second_read = {}
+        self._pidfile_age = {}
+        self._temporary_path_counter = 0
+        self._drones = {}
+        self._results_drone = None
+        self._attached_files = {}
+        self._drone_queue = []
+        self._null_drone = drones.NullDrone()
+
+
+    def initialize(self, base_results_dir, drone_hostnames,
+                   results_repository_hostname):
+        self._results_dir = base_results_dir
+        drones.set_temporary_directory(os.path.join(
+            base_results_dir, drone_utility._TEMPORARY_DIRECTORY))
+
+        for hostname in drone_hostnames:
+            try:
+                drone = self._add_drone(hostname)
+                drone.call('initialize', base_results_dir)
+            except error.AutoservError:
+                warning = 'Drone %s failed to initialize:\n%s' % (
+                    hostname, traceback.format_exc())
+                print warning
+                email_manager.manager.enqueue_notify_email(
+                    'Drone failed to initialize', warning)
+                self._remove_drone(hostname)
+
+        if not self._drones:
+            # all drones failed to initialize
+            raise DroneManagerError('No valid drones found')
+
+        print 'Using results repository on', results_repository_hostname
+        self._results_drone = drones.get_drone(results_repository_hostname)
+        # don't initialize() the results drone - we don't want to clear out any
+        # directories and we don't need ot kill any processes
+
+
+    def reinitialize_drones(self):
+        self._call_all_drones('initialize', self._results_dir)
+
+
+    def shutdown(self):
+        for drone in self._drones.itervalues():
+            drone.shutdown()
+
+
+    def _add_drone(self, hostname):
+        print 'Adding drone', hostname
+        drone = drones.get_drone(hostname)
+        self._drones[drone.hostname] = drone
+        return drone
+
+
+    def _remove_drone(self, hostname):
+        self._drones.pop(hostname, None)
+
+
+    def _get_drone_for_process(self, process):
+        if process.hostname == self._NULL_HOSTNAME:
+            return self._null_drone
+        return self._drones[process.hostname]
+
+
+    def _get_drone_for_pidfile_id(self, pidfile_id):
+        pidfile_contents = self.get_pidfile_contents(pidfile_id)
+        assert pidfile_contents.process is not None
+        return self._get_drone_for_process(pidfile_contents.process)
+
+
+    def _drop_old_pidfiles(self):
+        for pidfile_id, age in self._pidfile_age.items():
+            if age > self._MAX_PIDFILE_AGE:
+                del self._pidfile_age[pidfile_id]
+            else:
+                self._pidfile_age[pidfile_id] += 1
+
+
+    def _reset(self):
+        self._processes = {}
+        self._process_set = set()
+        self._pidfiles = {}
+        self._pidfiles_second_read = {}
+        self._drone_queue = []
+
+
+    def _call_all_drones(self, method, *args, **kwargs):
+        all_results = {}
+        for drone in self._drones.itervalues():
+            all_results[drone] = drone.call(method, *args, **kwargs)
+        return all_results
+
+
+    def _parse_pidfile(self, drone, raw_contents):
+        contents = PidfileContents()
+        if not raw_contents:
+            return contents
+        lines = raw_contents.splitlines()
+        if len(lines) > 3:
+            return InvalidPidfile('Corrupt pid file (%d lines):\n%s' %
+                                  (len(lines), lines))
+        try:
+            pid = int(lines[0])
+            contents.process = Process(drone.hostname, pid)
+            # if len(lines) == 2, assume we caught Autoserv between writing
+            # exit_status and num_failed_tests, so just ignore it and wait for
+            # the next cycle
+            if len(lines) == 3:
+                contents.exit_status = int(lines[1])
+                contents.num_tests_failed = int(lines[2])
+        except ValueError, exc:
+            return InvalidPidfile('Corrupt pid file: ' + str(exc.args))
+
+        return contents
+
+
+    def _process_pidfiles(self, drone, pidfiles, store_in_dict):
+        for pidfile_path, contents in pidfiles.iteritems():
+            pidfile_id = PidfileId(pidfile_path)
+            contents = self._parse_pidfile(drone, contents)
+            store_in_dict[pidfile_id] = contents
+
+
+    def refresh(self):
+        """
+        Called at the beginning of a scheduler cycle to refresh all process
+        information.
+        """
+        self._reset()
+        pidfile_paths = [pidfile_id.path for pidfile_id in self._pidfile_age]
+        all_results = self._call_all_drones('refresh', pidfile_paths)
+
+        for drone, results_list in all_results.iteritems():
+            results = results_list[0]
+            process_count = len(results['processes'])
+            heapq.heappush(self._drone_queue, (process_count, drone))
+            for process_info in results['processes']:
+                # only root autoserv processes have pgid == pid
+                if process_info['pgid'] != process_info['pid']:
+                    continue
+                process = Process(drone.hostname, int(process_info['pid']),
+                                  int(process_info['ppid']))
+                execution_tag = self._execution_tag_for_process(drone,
+                                                                process_info)
+                self._processes[execution_tag] = process
+                self._process_set.add(process)
+
+            self._process_pidfiles(drone, results['pidfiles'], self._pidfiles)
+            self._process_pidfiles(drone, results['pidfiles_second_read'],
+                                   self._pidfiles_second_read)
+
+
+    def _execution_tag_for_process(self, drone, process_info):
+        execution_tag = self._extract_execution_tag(process_info['args'])
+        if not execution_tag:
+            # this process has no execution tag - just make up something unique
+            return '%s.%s' % (drone, process_info['pid'])
+        return execution_tag
+
+
+    def _extract_execution_tag(self, command):
+        match = re.match(r'.* -P (\S+) ', command)
+        if not match:
+            return None
+        return match.group(1)
+
+
+    def execute_actions(self):
+        """
+        Called at the end of a scheduler cycle to execute all queued actions
+        on drones.
+        """
+        for drone in self._drones.values():
+            drone.execute_queued_calls()
+
+        try:
+             self._results_drone.execute_queued_calls()
+        except error.AutoservError:
+            warning = ('Results repository failed to execute calls:\n' +
+                       traceback.format_exc())
+            print warning
+            email_manager.manager.enqueue_notify_email(
+                'Results repository error', warning)
+            self._results_drone.clear_call_queue()
+
+
+    def get_orphaned_autoserv_processes(self):
+        """
+        Returns a dict mapping execution tags to AutoservProcess objects for
+        orphaned processes only.
+        """
+        return dict((execution_tag, process)
+                    for execution_tag, process in self._processes.iteritems()
+                    if process.ppid == 1)
+
+
+    def get_process_for(self, execution_tag):
+        """
+        Return the process object for the given execution tag.
+        """
+        return self._processes.get(execution_tag, None)
+
+
+    def get_dummy_process(self):
+        """
+        Return a null process object.
+        """
+        return Process(self._NULL_HOSTNAME, 0)
+
+
+    def kill_process(self, process):
+        """
+        Kill the given process.
+        """
+        print 'killing', process
+        drone = self._get_drone_for_process(process)
+        drone.queue_call('kill_process', process)
+
+
+    def _ensure_directory_exists(self, path):
+        if not os.path.exists(path):
+            os.makedirs(path)
+
+
+    def _extract_num_processes(self, command):
+        try:
+            machine_list_index = command.index('-m') + 1
+        except ValueError:
+            return 1
+        assert machine_list_index < len(command)
+        machine_list = command[machine_list_index].split(',')
+        return len(machine_list)
+
+
+    def _choose_drone_for_execution(self, num_processes):
+        processes, drone_to_use = heapq.heappop(self._drone_queue)
+        heapq.heappush(self._drone_queue,
+                       (processes + num_processes, drone_to_use))
+        return drone_to_use
+
+
+    def execute_command(self, command, working_directory, log_file=None,
+                        pidfile_name=None, paired_with_pidfile=None):
+        """
+        Execute the given command, taken as an argv list.
+
+        * working_directory: directory in which the pidfile will be written
+        * log_file (optional): specifies a path (in the results repository) to
+          hold command output.
+        * pidfile_name (optional): gives the name of the pidfile this process
+          will write
+        * paired_with_pidfile (optional): a PidfileId for an already-executed
+          process; the new process will execute on the same drone as the
+          previous process.
+        """
+        working_directory = self.absolute_path(working_directory)
+        if not log_file:
+            log_file = self.get_temporary_path('execute')
+        log_file = self.absolute_path(log_file)
+        if not pidfile_name:
+            pidfile_name = _AUTOSERV_PID_FILE
+
+        if paired_with_pidfile:
+            drone = self._get_drone_for_pidfile_id(paired_with_pidfile)
+        else:
+            num_processes = self._extract_num_processes(command)
+            drone = self._choose_drone_for_execution(num_processes)
+        print "command = %s" % command
+        print 'log file = %s:%s' % (drone.hostname, log_file)
+        self._write_attached_files(command, drone)
+        drone.queue_call('execute_command', command, working_directory,
+                         log_file, pidfile_name)
+
+        pidfile_path = self.absolute_path(os.path.join(working_directory,
+                                                       pidfile_name))
+        pidfile_id = PidfileId(pidfile_path)
+        self.register_pidfile(pidfile_id)
+        return pidfile_id
+
+
+    def get_pidfile_id_from(self, execution_tag):
+        path = os.path.join(self.absolute_path(execution_tag),
+                            _AUTOSERV_PID_FILE)
+        return PidfileId(path)
+
+
+    def register_pidfile(self, pidfile_id):
+        """
+        Indicate that the DroneManager should look for the given pidfile when
+        refreshing.
+        """
+        self._pidfile_age[pidfile_id] = 0
+
+
+    def get_pidfile_contents(self, pidfile_id, use_second_read=False):
+        """
+        Retrieve a PidfileContents object for the given pidfile_id.  If
+        use_second_read is True, use results that were read after the processes
+        were checked, instead of before.
+        """
+        self.register_pidfile(pidfile_id)
+        if use_second_read:
+            pidfile_map = self._pidfiles_second_read
+        else:
+            pidfile_map = self._pidfiles
+        return pidfile_map.get(pidfile_id, PidfileContents())
+
+
+    def is_process_running(self, process):
+        """
+        Check if the given process is in the running process list.
+        """
+        return process in self._process_set
+
+
+    def get_temporary_path(self, base_name):
+        """
+        Get a new temporary path guaranteed to be unique across all drones
+        for this scheduler execution.
+        """
+        self._temporary_path_counter += 1
+        return os.path.join(drone_utility._TEMPORARY_DIRECTORY,
+                            '%s.%s' % (base_name, self._temporary_path_counter))
+
+
+    def absolute_path(self, path):
+        return os.path.join(self._results_dir, path)
+
+
+    def copy_to_results_repository(self, process, source_path,
+                                   destination_path=None):
+        """
+        Copy results from the given process at source_path to destination_path
+        in the results repository.
+        """
+        if destination_path is None:
+            destination_path = source_path
+        full_source = self.absolute_path(source_path)
+        full_destination = self.absolute_path(destination_path)
+        source_drone = self._get_drone_for_process(process)
+        source_drone.send_file_to(self._results_drone, full_source,
+                                  full_destination, can_fail=True)
+
+
+    def _write_attached_files(self, command, drone):
+        execution_tag = self._extract_execution_tag(' '.join(command))
+        attached_files = self._attached_files.pop(execution_tag, [])
+        for file_path, contents in attached_files:
+            drone.queue_call('write_to_file', self.absolute_path(file_path),
+                             contents)
+
+
+    def attach_file_to_execution(self, execution_tag, file_contents,
+                                 file_path=None):
+        """
+        When the process for execution_tag is executed, the given file contents
+        will be placed in a file on the drone.  Returns the path at which the
+        file will be placed.
+        """
+        if not file_path:
+            file_path = self.get_temporary_path('attach')
+        self._attached_files.setdefault(execution_tag, []).append(
+            (file_path, file_contents))
+        return file_path
+
+
+    def write_lines_to_file(self, file_path, lines, paired_with_pidfile=None):
+        """
+        Write the given lines (as a list of strings) to a file.  If
+        paired_with_pidfile is given, the file will be written on the drone
+        running the given PidfileId.  Otherwise, the file will be written to the
+        results repository.
+        """
+        full_path = os.path.join(self._results_dir, file_path)
+        file_contents = '\n'.join(lines) + '\n'
+        if paired_with_pidfile:
+            drone = self._get_drone_for_pidfile_id(paired_with_pidfile)
+        else:
+            drone = self._results_drone
+        drone.queue_call('write_to_file', full_path, file_contents)
diff --git a/scheduler/drone_utility.py b/scheduler/drone_utility.py
new file mode 100644
index 0000000..7e69ba0
--- /dev/null
+++ b/scheduler/drone_utility.py
@@ -0,0 +1,297 @@
+#!/usr/bin/python2.4
+
+import pickle, subprocess, os, shutil, socket, sys, time, signal, getpass
+import datetime, traceback
+import common
+from autotest_lib.client.common_lib import utils, global_config, error
+from autotest_lib.server import hosts, subcommand
+from autotest_lib.scheduler import email_manager
+
+_TEMPORARY_DIRECTORY = 'drone_tmp'
+_TRANSFER_FAILED_FILE = '.transfer_failed'
+
+class _MethodCall(object):
+    def __init__(self, method, args, kwargs):
+        self._method = method
+        self._args = args
+        self._kwargs = kwargs
+
+
+    def execute_on(self, drone_utility):
+        method = getattr(drone_utility, self._method)
+        return method(*self._args, **self._kwargs)
+
+
+    def __str__(self):
+        args = ', '.join(repr(arg) for arg in self._args)
+        kwargs = ', '.join('%s=%r' % (key, value) for key, value in
+                           self._kwargs.iteritems())
+        full_args = ', '.join(item for item in (args, kwargs) if item)
+        return '%s(%s)' % (self._method, full_args)
+
+
+def call(method, *args, **kwargs):
+    return _MethodCall(method, args, kwargs)
+
+
+class DroneUtility(object):
+    """
+    This class executes actual OS calls on the drone machine.
+
+    All paths going into and out of this class are absolute.
+    """
+    _PS_ARGS = ['pid', 'pgid', 'ppid', 'comm', 'args']
+    _MAX_TRANSFER_PROCESSES = global_config.global_config.get_config_value(
+        'SCHEDULER', 'max_transfer_processes', type=int)
+    _WARNING_DURATION = 60
+
+    def __init__(self):
+        self.warnings = []
+        self._subcommands = []
+
+
+    def initialize(self, results_dir):
+        temporary_directory = os.path.join(results_dir, _TEMPORARY_DIRECTORY)
+        if os.path.exists(temporary_directory):
+            shutil.rmtree(temporary_directory)
+        self._ensure_directory_exists(temporary_directory)
+
+        # make sure there are no old parsers running
+        os.system('killall parse')
+
+
+    def _warn(self, warning):
+        self.warnings.append(warning)
+
+
+    def _refresh_autoserv_processes(self):
+        ps_proc = subprocess.Popen(
+            ['/bin/ps', 'x', '-o', ','.join(self._PS_ARGS)],
+            stdout=subprocess.PIPE)
+        ps_output = ps_proc.communicate()[0]
+
+        # split each line into the columns output by ps
+        split_lines = [line.split(None, 4) for line in ps_output.splitlines()]
+        process_infos = [dict(zip(self._PS_ARGS, line_components))
+                         for line_components in split_lines]
+
+        processes = []
+        for info in process_infos:
+            if info['comm'] == 'autoserv':
+                processes.append(info)
+
+        return processes
+
+
+    def _read_pidfiles(self, pidfile_paths):
+        pidfiles = {}
+        for pidfile_path in pidfile_paths:
+            if not os.path.exists(pidfile_path):
+                continue
+            try:
+                file_object = open(pidfile_path, 'r')
+                pidfiles[pidfile_path] = file_object.read()
+                file_object.close()
+            except IOError:
+                continue
+        return pidfiles
+
+
+    def refresh(self, pidfile_paths):
+        results = {
+            'pidfiles' : self._read_pidfiles(pidfile_paths),
+            'processes' : self._refresh_autoserv_processes(),
+            'pidfiles_second_read' : self._read_pidfiles(pidfile_paths),
+        }
+        return results
+
+
+    def _is_process_running(self, process):
+        # TODO: enhance this to check the process args
+        proc_path = os.path.join('/proc', str(process.pid))
+        return os.path.exists(proc_path)
+
+
+    def kill_process(self, process):
+        if self._is_process_running(process):
+            os.kill(process.pid, signal.SIGCONT)
+            os.kill(process.pid, signal.SIGTERM)
+
+
+    def _ensure_directory_exists(self, path):
+        if not os.path.exists(path):
+            os.makedirs(path)
+
+
+    def execute_command(self, command, working_directory, log_file,
+                        pidfile_name):
+        out_file = None
+        if log_file:
+            self._ensure_directory_exists(os.path.dirname(log_file))
+            try:
+                out_file = open(log_file, 'a')
+                separator = ('*' * 80) + '\n'
+                out_file.write('\n' + separator)
+                out_file.write("%s> %s\n" % (time.strftime("%X %x"), command))
+                out_file.write(separator)
+            except (OSError, IOError):
+                email_manager.manager.log_stacktrace(
+                    'Error opening log file %s' % log_file)
+
+        if not out_file:
+            out_file = open('/dev/null', 'w')
+
+        in_devnull = open('/dev/null', 'r')
+
+        self._ensure_directory_exists(working_directory)
+        pidfile_path = os.path.join(working_directory, pidfile_name)
+        if os.path.exists(pidfile_path):
+            self._warn('Pidfile %s already exists' % pidfile_path)
+            os.remove(pidfile_path)
+
+        subprocess.Popen(command, stdout=out_file, stderr=subprocess.STDOUT,
+                         stdin=in_devnull)
+        out_file.close()
+        in_devnull.close()
+
+
+    def write_to_file(self, file_path, contents):
+        self._ensure_directory_exists(os.path.dirname(file_path))
+        try:
+            file_object = open(file_path, 'a')
+            file_object.write(contents)
+            file_object.close()
+        except IOError, exc:
+            self._warn('Error write to file %s: %s' % (file_path, exc))
+
+
+    def copy_file(self, source_path, destination_path):
+        if source_path == destination_path:
+            return
+        self._ensure_directory_exists(os.path.dirname(destination_path))
+        shutil.copy(source_path, destination_path)
+
+
+    def run_async_command(self, function, args):
+        subproc = subcommand.subcommand(function, args)
+        self._subcommands.append(subproc)
+        subproc.fork_start()
+
+
+    def wait_for_async_commands(self):
+        for subproc in self._subcommands:
+            subproc.fork_waitfor()
+        self._subcommands = []
+
+
+    def _sync_get_file_from(self, hostname, source_path, destination_path):
+        self._ensure_directory_exists(os.path.dirname(destination_path))
+        host = create_host(hostname)
+        host.get_file(source_path, destination_path, delete_dest=True)
+
+
+    def get_file_from(self, hostname, source_path, destination_path):
+        self.run_async_command(self._sync_get_file_from,
+                               (hostname, source_path, destination_path))
+
+
+    def _sync_send_file_to(self, hostname, source_path, destination_path,
+                           can_fail):
+        host = create_host(hostname)
+        try:
+            host.run('mkdir -p ' + os.path.dirname(destination_path))
+            host.send_file(source_path, destination_path, delete_dest=True)
+        except error.AutoservError:
+            if not can_fail:
+                raise
+
+            if os.path.isdir(source_path):
+                failed_file = os.path.join(source_path, _TRANSFER_FAILED_FILE)
+                file_object = open(failed_file, 'w')
+                try:
+                    file_object.write('%s:%s\n%s\n%s' %
+                                      (hostname, destination_path,
+                                       datetime.datetime.now(),
+                                       traceback.format_exc()))
+                finally:
+                    file_object.close()
+            else:
+                copy_to = destination_path + _TRANSFER_FAILED_FILE
+                self._ensure_directory_exists(os.path.dirname(copy_to))
+                self.copy_file(source_path, copy_to)
+
+
+    def send_file_to(self, hostname, source_path, destination_path,
+                     can_fail=False):
+        self.run_async_command(self._sync_send_file_to,
+                               (hostname, source_path, destination_path,
+                                can_fail))
+
+
+    def _report_long_execution(self, calls, duration):
+        call_count = {}
+        for call in calls:
+            call_count.setdefault(call._method, 0)
+            call_count[call._method] += 1
+        call_summary = '\n'.join('%d %s' % (count, method)
+                                 for method, count in call_count.iteritems())
+        self._warn('Execution took %f sec\n%s' % (duration, call_summary))
+
+
+    def execute_calls(self, calls):
+        results = []
+        start_time = time.time()
+        for method_call in calls:
+            results.append(method_call.execute_on(self))
+            if len(self._subcommands) >= self._MAX_TRANSFER_PROCESSES:
+                self.wait_for_async_commands()
+        self.wait_for_async_commands()
+
+        duration = time.time() - start_time
+        if duration > self._WARNING_DURATION:
+            self._report_long_execution(calls, duration)
+
+        warnings = self.warnings
+        self.warnings = []
+        return dict(results=results, warnings=warnings)
+
+
+def create_host(hostname):
+    username = global_config.global_config.get_config_value(
+        'SCHEDULER', hostname + '_username', default=getpass.getuser())
+    return hosts.SSHHost(hostname, user=username)
+
+
+def parse_input():
+    input_chunks = []
+    chunk_of_input = sys.stdin.read()
+    while chunk_of_input:
+        input_chunks.append(chunk_of_input)
+        chunk_of_input = sys.stdin.read()
+    pickled_input = ''.join(input_chunks)
+
+    try:
+        return pickle.loads(pickled_input)
+    except Exception, exc:
+        separator = '*' * 50
+        raise ValueError('Unpickling input failed\n'
+                         'Input: %r\n'
+                         'Exception from pickle:\n'
+                         '%s\n%s\n%s' %
+                         (pickled_input, separator, traceback.format_exc(),
+                          separator))
+
+
+def return_data(data):
+    print pickle.dumps(data)
+
+
+def main():
+    calls = parse_input()
+    drone_utility = DroneUtility()
+    return_value = drone_utility.execute_calls(calls)
+    return_data(return_value)
+
+
+if __name__ == '__main__':
+    main()
diff --git a/scheduler/drones.py b/scheduler/drones.py
new file mode 100644
index 0000000..f687cd8
--- /dev/null
+++ b/scheduler/drones.py
@@ -0,0 +1,172 @@
+import pickle, os, tempfile
+import common
+from autotest_lib.scheduler import drone_utility, email_manager
+from autotest_lib.server import hosts
+from autotest_lib.client.common_lib import error, global_config
+
+
+AUTOTEST_INSTALL_DIR = global_config.global_config.get_config_value('SCHEDULER',
+                                                 'drone_installation_directory')
+
+class _AbstractDrone(object):
+    def __init__(self):
+        self._calls = []
+        self.hostname = None
+
+
+    def shutdown(self):
+        pass
+
+
+    def _execute_calls_impl(self, calls):
+        raise NotImplementedError
+
+
+    def _execute_calls(self, calls):
+        return_message = self._execute_calls_impl(calls)
+        for warning in return_message['warnings']:
+            subject = 'Warning from drone %s' % self.hostname
+            print subject + '\n' + warning
+            email_manager.manager.enqueue_notify_email(subject, warning)
+        return return_message['results']
+
+
+    def call(self, method, *args, **kwargs):
+        return self._execute_calls(
+            [drone_utility.call(method, *args, **kwargs)])
+
+
+    def queue_call(self, method, *args, **kwargs):
+        self._calls.append(drone_utility.call(method, *args, **kwargs))
+
+    def clear_call_queue(self):
+        self._calls = []
+
+
+    def execute_queued_calls(self):
+        if not self._calls:
+            return
+        self._execute_calls(self._calls)
+        self.clear_call_queue()
+
+
+class _LocalDrone(_AbstractDrone):
+    def __init__(self):
+        super(_LocalDrone, self).__init__()
+        self.hostname = 'localhost'
+        self._drone_utility = drone_utility.DroneUtility()
+
+
+    def _execute_calls_impl(self, calls):
+        return self._drone_utility.execute_calls(calls)
+
+
+    def send_file_to(self, drone, source_path, destination_path,
+                     can_fail=False):
+        if drone.hostname == self.hostname:
+            self.queue_call('copy_file', source_path, destination_path)
+        else:
+            self.queue_call('send_file_to', drone.hostname, source_path,
+                            destination_path, can_fail)
+
+
+class _RemoteDrone(_AbstractDrone):
+    _temporary_directory = None
+
+    def __init__(self, hostname):
+        super(_RemoteDrone, self).__init__()
+        self.hostname = hostname
+        self._host = drone_utility.create_host(hostname)
+        self._drone_utility_path = os.path.join(AUTOTEST_INSTALL_DIR,
+                                                'scheduler',
+                                                'drone_utility.py')
+
+        try:
+            self._host.run('mkdir -p ' + self._temporary_directory,
+                           timeout=10)
+        except error.AutoservError:
+            pass
+
+
+    @classmethod
+    def set_temporary_directory(cls, temporary_directory):
+        cls._temporary_directory = temporary_directory
+
+
+    def shutdown(self):
+        super(_RemoteDrone, self).shutdown()
+        self._host.close()
+
+
+    def _execute_calls_impl(self, calls):
+        calls_fd, calls_filename = tempfile.mkstemp(suffix='.pickled_calls')
+        calls_file = os.fdopen(calls_fd, 'w+')
+        pickle.dump(calls, calls_file)
+        calls_file.flush()
+        calls_file.seek(0)
+
+        try:
+            result = self._host.run('python %s' % self._drone_utility_path,
+                                    stdin=calls_file)
+        finally:
+            calls_file.close()
+            os.remove(calls_filename)
+
+        try:
+            return pickle.loads(result.stdout)
+        except Exception: # pickle.loads can throw all kinds of exceptions
+            print 'Invalid response:\n---\n%s\n---' % result.stdout
+            raise
+
+
+    def send_file_to(self, drone, source_path, destination_path,
+                     can_fail=False):
+        if drone.hostname == self.hostname:
+            self.queue_call('copy_file', source_path, destination_path)
+        elif isinstance(drone, _LocalDrone):
+            drone.queue_call('get_file_from', self.hostname, source_path,
+                             destination_path)
+        else:
+            self.queue_call('send_file_to', drone.hostname, source_path,
+                            destination_path, can_fail)
+
+
+class NullDrone(object):
+    """
+    Null object utility for processes that have failed to run.
+    """
+    def shutdown(self):
+        pass
+
+
+    def call(self, *args, **kwargs):
+        pass
+
+
+    def queue_call(self, *args, **kwargs):
+        pass
+
+
+    def clear_call_queue(self):
+        pass
+
+
+    def execute_queued_calls(self):
+        pass
+
+
+    def send_file_to(self, *args, **kwargs):
+        pass
+
+
+def set_temporary_directory(temporary_directory):
+    _RemoteDrone.set_temporary_directory(temporary_directory)
+
+
+def get_drone(hostname):
+    """
+    Use this factory method to get drone objects.
+    """
+    if hostname == 'localhost':
+        return _LocalDrone()
+    return _RemoteDrone(hostname)
diff --git a/scheduler/email_manager.py b/scheduler/email_manager.py
new file mode 100644
index 0000000..bc30f78
--- /dev/null
+++ b/scheduler/email_manager.py
@@ -0,0 +1,72 @@
+import traceback, socket, os, time, smtplib, re, sys, getpass
+import common
+from autotest_lib.client.common_lib import global_config
+
+CONFIG_SECTION = 'SCHEDULER'
+
+class EmailNotificationManager(object):
+    def __init__(self):
+        self._emails = []
+
+        self._from_address = global_config.global_config.get_config_value(
+            CONFIG_SECTION, "notify_email_from")
+        if not self._from_address:
+            self._from_address = getpass.getuser()
+
+        self._notify_address = global_config.global_config.get_config_value(
+            CONFIG_SECTION, "notify_email")
+
+
+    def send_email(self, to_string, subject, body):
+        """Mails out emails to the addresses listed in to_string.
+
+        to_string is split into a list which can be delimited by any of:
+            ';', ',', ':' or any whitespace
+        """
+        # Create list from string removing empty strings from the list.
+        to_list = [x for x in re.split('\s|,|;|:', to_string) if x]
+        if not to_list:
+            return
+
+        msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (
+            self._from_address, ', '.join(to_list), subject, body)
+        try:
+            mailer = smtplib.SMTP('localhost')
+            try:
+                mailer.sendmail(self._from_address, to_list, msg)
+            finally:
+                mailer.quit()
+        except Exception:
+            print "Sending email failed:"
+            traceback.print_exc()
+
+
+    def enqueue_notify_email(self, subject, message):
+        if not self._notify_address:
+            return
+
+        body = 'Subject: ' + subject + '\n'
+        body += "%s / %s / %s\n%s" % (socket.gethostname(),
+                                      os.getpid(),
+                                      time.strftime("%X %x"), message)
+        self._emails.append(body)
+
+
+    def send_queued_emails(self):
+        if not self._emails:
+            return
+        subject = 'Scheduler notifications from ' + socket.gethostname()
+        separator = '\n' + '-' * 40 + '\n'
+        body = separator.join(self._emails)
+
+        self.send_email(self._notify_address, subject, body)
+        self._emails = []
+
+
+    def log_stacktrace(self, reason):
+        message = "EXCEPTION: %s\n%s" % (reason, traceback.format_exc())
+        sys.stderr.write("\n%s\n" % message)
+        self.enqueue_notify_email("monitor_db exception", message)
+
+
+manager = EmailNotificationManager()
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index 4210d42..a47f520 100644
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -7,17 +7,20 @@
 
 import datetime, errno, MySQLdb, optparse, os, pwd, Queue, re, shutil, signal
 import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
+import itertools, logging
 import common
 from autotest_lib.frontend import setup_django_environment
 from autotest_lib.client.common_lib import global_config
 from autotest_lib.client.common_lib import host_protections, utils, debug
 from autotest_lib.database import database_connection
 from autotest_lib.frontend.afe import models
+from autotest_lib.scheduler import drone_manager, drones, email_manager
 
 
 RESULTS_DIR = '.'
 AUTOSERV_NICE_LEVEL = 10
-CONFIG_SECTION = 'AUTOTEST_WEB'
+CONFIG_SECTION = 'SCHEDULER'
+DB_CONFIG_SECTION = 'AUTOTEST_WEB'
 
 AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
 
@@ -29,20 +32,17 @@
 if AUTOTEST_SERVER_DIR not in sys.path:
     sys.path.insert(0, AUTOTEST_SERVER_DIR)
 
-AUTOSERV_PID_FILE = '.autoserv_execute'
 # how long to wait for autoserv to write a pidfile
 PIDFILE_TIMEOUT = 5 * 60 # 5 min
 
 _db = None
 _shutdown = False
-_notify_email = None
-_autoserv_path = 'autoserv'
+_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
+_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
 _testing_mode = False
-_global_config_section = 'SCHEDULER'
 _base_url = None
-# see os.getlogin() online docs
-_email_from = pwd.getpwuid(os.getuid())[0]
 _notify_email_statuses = []
+_drone_manager = drone_manager.DroneManager()
 
 
 def main():
@@ -65,26 +65,16 @@
     global RESULTS_DIR
     RESULTS_DIR = args[0]
 
-    # read in notify_email from global_config
     c = global_config.global_config
-    global _notify_email
-    val = c.get_config_value(_global_config_section, "notify_email")
-    if val != "":
-        _notify_email = val
-
-    global _email_from
-    val = c.get_config_value(_global_config_section, "notify_email_from")
-    if val != "":
-        _email_from = val
-
+    notify_statuses_list = c.get_config_value(CONFIG_SECTION,
+                                              "notify_email_statuses")
     global _notify_email_statuses
-    val = c.get_config_value(_global_config_section, "notify_email_statuses")
-    if val != "":
-        _notify_email_statuses = [status for status in
-                                 re.split(r'[\s,;:]', val.lower()) if status]
+    _notify_email_statuses = [status for status in
+                              re.split(r'[\s,;:]', notify_statuses_list.lower())
+                              if status]
 
-    tick_pause = c.get_config_value(
-        _global_config_section, 'tick_pause_sec', type=int)
+    tick_pause = c.get_config_value(CONFIG_SECTION, 'tick_pause_sec',
+                                   type=int)
 
     if options.test:
         global _autoserv_path
@@ -95,7 +85,8 @@
     # AUTOTEST_WEB.base_url is still a supported config option as some people
     # may wish to override the entire url.
     global _base_url
-    config_base_url = c.get_config_value(CONFIG_SECTION, 'base_url')
+    config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
+                                         default='')
     if config_base_url:
         _base_url = config_base_url
     else:
@@ -116,9 +107,11 @@
             dispatcher.tick()
             time.sleep(tick_pause)
     except:
-        log_stacktrace("Uncaught exception; terminating monitor_db")
+        email_manager.manager.log_stacktrace(
+            "Uncaught exception; terminating monitor_db")
 
-    email_manager.send_queued_emails()
+    email_manager.manager.send_queued_emails()
+    _drone_manager.shutdown()
     _db.disconnect()
 
 
@@ -136,21 +129,32 @@
 
     if _testing_mode:
         global_config.global_config.override_config_value(
-            CONFIG_SECTION, 'database', 'stresstest_autotest_web')
+            DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
 
     os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
     global _db
-    _db = database_connection.DatabaseConnection(CONFIG_SECTION)
+    _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
     _db.connect()
 
     # ensure Django connection is in autocommit
     setup_django_environment.enable_autocommit()
 
     debug.configure('scheduler', format_string='%(message)s')
+    debug.get_logger().setLevel(logging.WARNING)
 
     print "Setting signal handler"
     signal.signal(signal.SIGINT, handle_sigint)
 
+    drones = global_config.global_config.get_config_value(CONFIG_SECTION,
+                                                          'drones')
+    if drones:
+        drone_list = [hostname.strip() for hostname in drones.split(',')]
+    else:
+        drone_list = ['localhost']
+    results_host = global_config.global_config.get_config_value(
+        CONFIG_SECTION, 'results_host', default='localhost')
+    _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
+
     print "Connected! Running..."
 
 
@@ -176,98 +180,6 @@
     qe = [HostQueueEntry(row=i) for i in rows]
     return qe
 
-def remove_file_or_dir(path):
-    if stat.S_ISDIR(os.stat(path).st_mode):
-        # directory
-        shutil.rmtree(path)
-    else:
-        # file
-        os.remove(path)
-
-
-def log_stacktrace(reason):
-    (type, value, tb) = sys.exc_info()
-    str = "EXCEPTION: %s\n" % reason
-    str += ''.join(traceback.format_exception(type, value, tb))
-
-    sys.stderr.write("\n%s\n" % str)
-    email_manager.enqueue_notify_email("monitor_db exception", str)
-
-
-def get_proc_poll_fn(pid):
-    proc_path = os.path.join('/proc', str(pid))
-    def poll_fn():
-        if os.path.exists(proc_path):
-            return None
-        return 0 # we can't get a real exit code
-    return poll_fn
-
-
-def send_email(to_string, subject, body):
-    """Mails out emails to the addresses listed in to_string.
-
-    to_string is split into a list which can be delimited by any of:
-            ';', ',', ':' or any whitespace
-    """
-
-    # Create list from string removing empty strings from the list.
-    to_list = [x for x in re.split('\s|,|;|:', to_string) if x]
-    if not to_list:
-        return
-
-    msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (
-        _email_from, ', '.join(to_list), subject, body)
-    try:
-        mailer = smtplib.SMTP('localhost')
-        try:
-            mailer.sendmail(_email_from, to_list, msg)
-        finally:
-            mailer.quit()
-    except Exception, e:
-        print "Sending email failed. Reason: %s" % repr(e)
-
-
-def kill_autoserv(pid, poll_fn=None):
-    print 'killing', pid
-    if poll_fn is None:
-        poll_fn = get_proc_poll_fn(pid)
-    if poll_fn() is None:
-        os.kill(pid, signal.SIGCONT)
-        os.kill(pid, signal.SIGTERM)
-
-
-def ensure_directory_exists(directory_path):
-    if not os.path.exists(directory_path):
-        os.makedirs(directory_path)
-
-
-class EmailNotificationManager(object):
-    def __init__(self):
-        self._emails = []
-
-    def enqueue_notify_email(self, subject, message):
-        if not _notify_email:
-            return
-
-        body = 'Subject: ' + subject + '\n'
-        body += "%s / %s / %s\n%s" % (socket.gethostname(),
-                                      os.getpid(),
-                                      time.strftime("%X %x"), message)
-        self._emails.append(body)
-
-
-    def send_queued_emails(self):
-        if not self._emails:
-            return
-        subject = 'Scheduler notifications from ' + socket.gethostname()
-        separator = '\n' + '-' * 40 + '\n'
-        body = separator.join(self._emails)
-
-        send_email(_notify_email, subject, body)
-        self._emails = []
-
-email_manager = EmailNotificationManager()
-
 
 class HostScheduler(object):
     def _get_ready_hosts(self):
@@ -466,25 +378,26 @@
         return self._schedule_metahost(queue_entry)
 
 
-class Dispatcher:
-    autoserv_procs_cache = None
+class Dispatcher(object):
     max_running_processes = global_config.global_config.get_config_value(
-        _global_config_section, 'max_running_jobs', type=int)
+        CONFIG_SECTION, 'max_running_jobs', type=int)
     max_processes_started_per_cycle = (
         global_config.global_config.get_config_value(
-            _global_config_section, 'max_jobs_started_per_cycle', type=int))
+            CONFIG_SECTION, 'max_jobs_started_per_cycle', type=int))
     clean_interval = (
         global_config.global_config.get_config_value(
-            _global_config_section, 'clean_interval_minutes', type=int))
+            CONFIG_SECTION, 'clean_interval_minutes', type=int))
     synch_job_start_timeout_minutes = (
         global_config.global_config.get_config_value(
-            _global_config_section, 'synch_job_start_timeout_minutes',
+            CONFIG_SECTION, 'synch_job_start_timeout_minutes',
             type=int))
 
     def __init__(self):
         self._agents = []
         self._last_clean_time = time.time()
         self._host_scheduler = HostScheduler()
+        self._host_agents = {}
+        self._queue_entry_agents = {}
 
 
     def do_initial_recovery(self, recover_hosts=True):
@@ -496,12 +409,13 @@
 
 
     def tick(self):
-        Dispatcher.autoserv_procs_cache = None
+        _drone_manager.refresh()
         self._run_cleanup_maybe()
         self._find_aborting()
         self._schedule_new_jobs()
         self._handle_agents()
-        email_manager.send_queued_emails()
+        _drone_manager.execute_actions()
+        email_manager.manager.send_queued_emails()
 
 
     def _run_cleanup_maybe(self):
@@ -514,21 +428,45 @@
             self._last_clean_time = time.time()
 
 
+    def _register_agent_for_ids(self, agent_dict, object_ids, agent):
+        for object_id in object_ids:
+            agent_dict.setdefault(object_id, set()).add(agent)
+
+
+    def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
+        for object_id in object_ids:
+            assert object_id in agent_dict
+            agent_dict[object_id].remove(agent)
+
+
     def add_agent(self, agent):
         self._agents.append(agent)
         agent.dispatcher = self
+        self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
+        self._register_agent_for_ids(self._queue_entry_agents,
+                                     agent.queue_entry_ids, agent)
 
-    # Find agent corresponding to the specified queue_entry
-    def get_agents(self, queue_entry):
-        res_agents = []
-        for agent in self._agents:
-            if queue_entry.id in agent.queue_entry_ids:
-                res_agents.append(agent)
-        return res_agents
+
+    def get_agents_for_entry(self, queue_entry):
+        """
+        Find agents corresponding to the specified queue_entry.
+        """
+        return self._queue_entry_agents.get(queue_entry.id, set())
+
+
+    def host_has_agent(self, host):
+        """
+        Determine if there is currently an Agent present using this host.
+        """
+        return bool(self._host_agents.get(host.id, None))
 
 
     def remove_agent(self, agent):
         self._agents.remove(agent)
+        self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
+                                       agent)
+        self._unregister_agent_for_ids(self._queue_entry_agents,
+                                       agent.queue_entry_ids, agent)
 
 
     def num_running_processes(self):
@@ -536,115 +474,108 @@
                    if agent.is_running())
 
 
-    @classmethod
-    def find_autoservs(cls, orphans_only=False):
-        """\
-        Returns a dict mapping pids to command lines for root autoserv
-        processes.  If orphans_only=True, return only processes that
-        have been orphaned (i.e. parent pid = 1).
-        """
-        if cls.autoserv_procs_cache is not None:
-            return cls.autoserv_procs_cache
-
-        proc = subprocess.Popen(
-            ['/bin/ps', 'x', '-o', 'pid,pgid,ppid,comm,args'],
-            stdout=subprocess.PIPE)
-        # split each line into the four columns output by ps
-        procs = [line.split(None, 4) for line in
-                 proc.communicate()[0].splitlines()]
-        autoserv_procs = {}
-        for proc in procs:
-            # check ppid == 1 for orphans
-            if orphans_only and proc[2] != 1:
-                continue
-            # only root autoserv processes have pgid == pid
-            if (proc[3] == 'autoserv' and   # comm
-                proc[1] == proc[0]):        # pgid == pid
-                # map pid to args
-                autoserv_procs[int(proc[0])] = proc[4]
-        cls.autoserv_procs_cache = autoserv_procs
-        return autoserv_procs
+    def _extract_execution_tag(self, command_line):
+        match = re.match(r'.* -P (\S+) ', command_line)
+        if not match:
+            return None
+        return match.group(1)
 
 
     def _recover_queue_entries(self, queue_entries, run_monitor):
         assert len(queue_entries) > 0
-        queue_entry_ids = [entry.id for entry in queue_entries]
         queue_task = RecoveryQueueTask(job=queue_entries[0].job,
                                        queue_entries=queue_entries,
                                        run_monitor=run_monitor)
         self.add_agent(Agent(tasks=[queue_task],
-                             queue_entry_ids=queue_entry_ids))
+                             num_processes=len(queue_entries)))
 
 
     def _recover_processes(self):
-        orphans = self.find_autoservs(orphans_only=True)
+        self._register_pidfiles()
+        _drone_manager.refresh()
+        self._recover_running_entries()
+        self._recover_aborting_entries()
+        self._requeue_other_active_entries()
+        self._recover_parsing_entries()
+        self._reverify_remaining_hosts()
+        # reinitialize drones after killing orphaned processes, since they can
+        # leave around files when they die
+        _drone_manager.execute_actions()
+        _drone_manager.reinitialize_drones()
 
-        # first, recover running queue entries
-        rows = _db.execute("""SELECT * FROM host_queue_entries
-                              WHERE status = 'Running'""")
-        queue_entries = [HostQueueEntry(row=i) for i in rows]
-        requeue_entries = []
-        recovered_entry_ids = set()
+
+    def _register_pidfiles(self):
+        # during recovery we may need to read pidfiles for both running and
+        # parsing entries
+        queue_entries = HostQueueEntry.fetch(
+            where="status IN ('Running', 'Parsing')")
         for queue_entry in queue_entries:
-            run_monitor = PidfileRunMonitor(queue_entry.results_dir())
-            if not run_monitor.has_pid():
-                # autoserv apparently never got run, so requeue
-                requeue_entries.append(queue_entry)
-                continue
-            if queue_entry.id in recovered_entry_ids:
+            pidfile_id = _drone_manager.get_pidfile_id_from(
+                queue_entry.execution_tag())
+            _drone_manager.register_pidfile(pidfile_id)
+
+
+    def _recover_running_entries(self):
+        orphans = _drone_manager.get_orphaned_autoserv_processes()
+
+        queue_entries = HostQueueEntry.fetch(where="status = 'Running'")
+        requeue_entries = []
+        for queue_entry in queue_entries:
+            if self.get_agents_for_entry(queue_entry):
                 # synchronous job we've already recovered
                 continue
-            job_tag = queue_entry.job.get_job_tag([queue_entry])
-            pid = run_monitor.get_pid()
-            print 'Recovering %s (pid %d)' % (queue_entry.id, pid)
+            execution_tag = queue_entry.execution_tag()
+            run_monitor = PidfileRunMonitor()
+            run_monitor.attach_to_existing_process(execution_tag)
+            if not run_monitor.has_process():
+                # autoserv apparently never got run, so let it get requeued
+                continue
             queue_entries = queue_entry.job.get_group_entries(queue_entry)
-            recovered_entry_ids.union(entry.id for entry in queue_entries)
+            print 'Recovering %s (process %s)' % (
+                ', '.join(str(entry) for entry in queue_entries),
+                run_monitor.get_process())
             self._recover_queue_entries(queue_entries, run_monitor)
-            orphans.pop(pid, None)
-
-        # and requeue other active queue entries
-        rows = _db.execute("""SELECT * FROM host_queue_entries
-                              WHERE active AND NOT complete
-                              AND status != 'Running'
-                              AND status != 'Pending'
-                              AND status != 'Abort'
-                              AND status != 'Aborting'""")
-        queue_entries = [HostQueueEntry(row=i) for i in rows]
-        for queue_entry in queue_entries + requeue_entries:
-            print 'Requeuing running QE %d' % queue_entry.id
-            queue_entry.clear_results_dir(dont_delete_files=True)
-            queue_entry.requeue()
-
+            orphans.pop(execution_tag, None)
 
         # now kill any remaining autoserv processes
-        for pid in orphans.keys():
-            print 'Killing orphan %d (%s)' % (pid, orphans[pid])
-            kill_autoserv(pid)
+        for process in orphans.itervalues():
+            print 'Killing orphan %s' % process
+            _drone_manager.kill_process(process)
 
-        # recover aborting tasks
-        rebooting_host_ids = set()
-        rows = _db.execute("""SELECT * FROM host_queue_entries
-                           WHERE status='Abort' or status='Aborting'""")
-        queue_entries = [HostQueueEntry(row=i) for i in rows]
+
+    def _recover_aborting_entries(self):
+        queue_entries = HostQueueEntry.fetch(
+            where='status IN ("Abort", "Aborting")')
         for queue_entry in queue_entries:
-            print 'Recovering aborting QE %d' % queue_entry.id
-            agent = queue_entry.abort()
-            self.add_agent(agent)
-            if queue_entry.get_host():
-                rebooting_host_ids.add(queue_entry.get_host().id)
+            print 'Recovering aborting QE %s' % queue_entry
+            agent = queue_entry.abort(self)
 
-        self._recover_parsing_entries()
 
+    def _requeue_other_active_entries(self):
+        queue_entries = HostQueueEntry.fetch(
+            where='active AND NOT complete AND status != "Pending"')
+        for queue_entry in queue_entries:
+            if self.get_agents_for_entry(queue_entry):
+                # entry has already been recovered
+                continue
+            print 'Requeuing active QE %s (status=%s)' % (queue_entry,
+                                                          queue_entry.status)
+            if queue_entry.host:
+                tasks = queue_entry.host.reverify_tasks()
+                self.add_agent(Agent(tasks))
+            agent = queue_entry.requeue()
+
+
+    def _reverify_remaining_hosts(self):
         # reverify hosts that were in the middle of verify, repair or cleanup
         self._reverify_hosts_where("""(status = 'Repairing' OR
                                        status = 'Verifying' OR
-                                       status = 'Cleaning')""",
-                                   exclude_ids=rebooting_host_ids)
+                                       status = 'Cleaning')""")
 
-        # finally, recover "Running" hosts with no active queue entries,
-        # although this should never happen
-        message = ('Recovering running host %s - this probably '
-                   'indicates a scheduler bug')
+        # recover "Running" hosts with no active queue entries, although this
+        # should never happen
+        message = ('Recovering running host %s - this probably indicates a '
+                   'scheduler bug')
         self._reverify_hosts_where("""status = 'Running' AND
                                       id NOT IN (SELECT host_id
                                                  FROM host_queue_entries
@@ -653,33 +584,31 @@
 
 
     def _reverify_hosts_where(self, where,
-                              print_message='Reverifying host %s',
-                              exclude_ids=set()):
-        rows = _db.execute('SELECT * FROM hosts WHERE locked = 0 AND '
-                           'invalid = 0 AND ' + where)
-        hosts = [Host(row=i) for i in rows]
-        for host in hosts:
-            if host.id in exclude_ids:
+                              print_message='Reverifying host %s'):
+        full_where='locked = 0 AND invalid = 0 AND ' + where
+        for host in Host.fetch(where=full_where):
+            if self.host_has_agent(host):
+                # host has already been recovered in some way
                 continue
-            if print_message is not None:
+            if print_message:
                 print print_message % host.hostname
-            verify_task = VerifyTask(host = host)
-            self.add_agent(Agent(tasks = [verify_task]))
+            tasks = host.reverify_tasks()
+            self.add_agent(Agent(tasks))
 
 
     def _recover_parsing_entries(self):
-        # make sure there are no old parsers running
-        os.system('killall parse')
-
         recovered_entry_ids = set()
         for entry in HostQueueEntry.fetch(where='status = "Parsing"'):
             if entry.id in recovered_entry_ids:
                 continue
             queue_entries = entry.job.get_group_entries(entry)
-            recovered_entry_ids.union(entry.id for entry in queue_entries)
+            recovered_entry_ids = recovered_entry_ids.union(
+                entry.id for entry in queue_entries)
+            print 'Recovering parsing entries %s' % (
+                ', '.join(str(entry) for entry in queue_entries))
 
             reparse_task = FinalReparseTask(queue_entries)
-            self.add_agent(Agent([reparse_task]))
+            self.add_agent(Agent([reparse_task], num_processes=0))
 
 
     def _recover_hosts(self):
@@ -742,8 +671,6 @@
 
 
     def _schedule_new_jobs(self):
-        print "finding work"
-
         queue_entries = self._get_pending_queue_entries()
         if not queue_entries:
             return
@@ -759,24 +686,19 @@
 
     def _run_queue_entry(self, queue_entry, host):
         agent = queue_entry.run(assigned_host=host)
-        # in some cases (synchronous jobs with run_verify=False), agent may be None
+        # in some cases (synchronous jobs with run_verify=False), agent may be
+        # None
         if agent:
             self.add_agent(agent)
 
 
     def _find_aborting(self):
-        num_aborted = 0
-        # Find jobs that are aborting
         for entry in queue_entries_to_abort():
-            agents_to_abort = self.get_agents(entry)
+            agents_to_abort = list(self.get_agents_for_entry(entry))
             for agent in agents_to_abort:
                 self.remove_agent(agent)
 
-            agent = entry.abort(agents_to_abort)
-            self.add_agent(agent)
-            num_aborted += 1
-            if num_aborted >= 50:
-                break
+            entry.abort(self, agents_to_abort)
 
 
     def _can_start_agent(self, agent, num_running_processes,
@@ -811,7 +733,7 @@
         for agent in list(self._agents):
             if agent.is_done():
                 print "agent finished"
-                self._agents.remove(agent)
+                self.remove_agent(agent)
                 continue
             if not agent.is_running():
                 if not self._can_start_agent(agent, num_running_processes,
@@ -836,197 +758,112 @@
                 message += '\n(truncated)\n'
 
             print subject
-            email_manager.enqueue_notify_email(subject, message)
+            email_manager.manager.enqueue_notify_email(subject, message)
 
 
-class RunMonitor(object):
-    def __init__(self, cmd, nice_level = None, log_file = None):
-        self.nice_level = nice_level
-        self.log_file = log_file
-        self.cmd = cmd
-        self.proc = None
+class PidfileRunMonitor(object):
+    """
+    Client must call either run() to start a new process or
+    attach_to_existing_process().
+    """
 
-    def run(self):
-        if self.nice_level:
-            nice_cmd = ['nice','-n', str(self.nice_level)]
-            nice_cmd.extend(self.cmd)
-            self.cmd = nice_cmd
-
-        out_file = None
-        if self.log_file:
-            try:
-                os.makedirs(os.path.dirname(self.log_file))
-            except OSError, exc:
-                if exc.errno != errno.EEXIST:
-                    log_stacktrace(
-                        'Unexpected error creating logfile '
-                        'directory for %s' % self.log_file)
-            try:
-                out_file = open(self.log_file, 'a')
-                out_file.write("\n%s\n" % ('*'*80))
-                out_file.write("%s> %s\n" %
-                               (time.strftime("%X %x"),
-                                self.cmd))
-                out_file.write("%s\n" % ('*'*80))
-            except (OSError, IOError):
-                log_stacktrace('Error opening log file %s' %
-                               self.log_file)
-
-        if not out_file:
-            out_file = open('/dev/null', 'w')
-
-        in_devnull = open('/dev/null', 'r')
-        print "cmd = %s" % self.cmd
-        print "path = %s" % os.getcwd()
-
-        self.proc = subprocess.Popen(self.cmd, stdout=out_file,
-                                     stderr=subprocess.STDOUT,
-                                     stdin=in_devnull)
-        out_file.close()
-        in_devnull.close()
+    class _PidfileException(Exception):
+        """
+        Raised when there's some unexpected behavior with the pid file, but only
+        used internally (never allowed to escape this class).
+        """
 
 
-    def has_pid(self):
-        return self.proc is not None
+    def __init__(self):
+        self._lost_process = False
+        self._start_time = None
+        self.pidfile_id = None
+        self._state = drone_manager.PidfileContents()
 
 
-    def get_pid(self):
-        return self.proc.pid
+    def _add_nice_command(self, command, nice_level):
+        if not nice_level:
+            return command
+        return ['nice', '-n', str(nice_level)] + command
+
+
+    def _set_start_time(self):
+        self._start_time = time.time()
+
+
+    def run(self, command, working_directory, nice_level=None, log_file=None,
+            pidfile_name=None, paired_with_pidfile=None):
+        assert command is not None
+        if nice_level is not None:
+            command = ['nice', '-n', str(nice_level)] + command
+        self._set_start_time()
+        self.pidfile_id = _drone_manager.execute_command(
+            command, working_directory, log_file=log_file,
+            pidfile_name=pidfile_name, paired_with_pidfile=paired_with_pidfile)
+
+
+    def attach_to_existing_process(self, execution_tag):
+        self._set_start_time()
+        self.pidfile_id = _drone_manager.get_pidfile_id_from(execution_tag)
+        _drone_manager.register_pidfile(self.pidfile_id)
 
 
     def kill(self):
-        if self.has_pid():
-            kill_autoserv(self.get_pid(), self.exit_code)
+        if self.has_process():
+            _drone_manager.kill_process(self.get_process())
 
 
-    def exit_code(self):
-        return self.proc.poll()
-
-
-class PidfileException(Exception):
-    """\
-    Raised when there's some unexpected behavior with the pid file.
-    """
-
-
-class PidfileRunMonitor(RunMonitor):
-    class PidfileState(object):
-        pid = None
-        exit_status = None
-        num_tests_failed = None
-
-        def reset(self):
-            self.pid = self.exit_status = self.all_tests_passed = None
-
-
-    def __init__(self, results_dir, cmd=None, nice_level=None,
-                 log_file=None):
-        self.results_dir = os.path.abspath(results_dir)
-        self.pid_file = os.path.join(results_dir, AUTOSERV_PID_FILE)
-        self.lost_process = False
-        self.start_time = time.time()
-        self._state = self.PidfileState()
-        super(PidfileRunMonitor, self).__init__(cmd, nice_level, log_file)
-
-
-    def has_pid(self):
+    def has_process(self):
         self._get_pidfile_info()
-        return self._state.pid is not None
+        return self._state.process is not None
 
 
-    def get_pid(self):
+    def get_process(self):
         self._get_pidfile_info()
-        assert self._state.pid is not None
-        return self._state.pid
+        assert self.has_process()
+        return self._state.process
 
 
-    def _check_command_line(self, command_line, spacer=' ',
-                            print_error=False):
-        results_dir_arg = spacer.join(('', '-r', self.results_dir, ''))
-        match = results_dir_arg in command_line
-        if print_error and not match:
-            print '%s not found in %s' % (repr(results_dir_arg),
-                                          repr(command_line))
-        return match
-
-
-    def _check_proc_fs(self):
-        cmdline_path = os.path.join('/proc', str(self._state.pid), 'cmdline')
-        try:
-            cmdline_file = open(cmdline_path, 'r')
-            cmdline = cmdline_file.read().strip()
-            cmdline_file.close()
-        except IOError:
-            return False
-        # /proc/.../cmdline has \x00 separating args
-        return self._check_command_line(cmdline, spacer='\x00',
-                                        print_error=True)
-
-
-    def _read_pidfile(self):
-        self._state.reset()
-        if not os.path.exists(self.pid_file):
-            return
-        file_obj = open(self.pid_file, 'r')
-        lines = file_obj.readlines()
-        file_obj.close()
-        if not lines:
-            return
-        if len(lines) > 3:
-            raise PidfileException('Corrupt pid file (%d lines) at %s:\n%s' %
-                                   (len(lines), self.pid_file, lines))
-        try:
-            self._state.pid = int(lines[0])
-            if len(lines) > 1:
-                self._state.exit_status = int(lines[1])
-                if len(lines) == 3:
-                    self._state.num_tests_failed = int(lines[2])
-                else:
-                    # maintain backwards-compatibility with two-line pidfiles
-                    self._state.num_tests_failed = 0
-        except ValueError, exc:
-            raise PidfileException('Corrupt pid file: ' + str(exc.args))
-
-
-    def _find_autoserv_proc(self):
-        autoserv_procs = Dispatcher.find_autoservs()
-        for pid, args in autoserv_procs.iteritems():
-            if self._check_command_line(args):
-                return pid, args
-        return None, None
+    def _read_pidfile(self, use_second_read=False):
+        assert self.pidfile_id is not None, (
+            'You must call run() or attach_to_existing_process()')
+        contents = _drone_manager.get_pidfile_contents(
+            self.pidfile_id, use_second_read=use_second_read)
+        if contents.is_invalid():
+            self._state = drone_manager.PidfileContents()
+            raise self._PidfileException(contents)
+        self._state = contents
 
 
     def _handle_pidfile_error(self, error, message=''):
-        message = error + '\nPid: %s\nPidfile: %s\n%s' % (self._state.pid,
-                                                          self.pid_file,
-                                                          message)
+        message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
+            self._state.process, self.pidfile_id, message)
         print message
-        email_manager.enqueue_notify_email(error, message)
-        if self._state.pid is not None:
-            pid = self._state.pid
+        email_manager.manager.enqueue_notify_email(error, message)
+        if self._state.process is not None:
+            process = self._state.process
         else:
-            pid = 0
-        self.on_lost_process(pid)
+            process = _drone_manager.get_dummy_process()
+        self.on_lost_process(process)
 
 
     def _get_pidfile_info_helper(self):
-        if self.lost_process:
+        if self._lost_process:
             return
 
         self._read_pidfile()
 
-        if self._state.pid is None:
-            self._handle_no_pid()
+        if self._state.process is None:
+            self._handle_no_process()
             return
 
         if self._state.exit_status is None:
             # double check whether or not autoserv is running
-            proc_running = self._check_proc_fs()
-            if proc_running:
+            if _drone_manager.is_process_running(self._state.process):
                 return
 
-            # pid but no process - maybe process *just* exited
-            self._read_pidfile()
+            # pid but no running process - maybe process *just* exited
+            self._read_pidfile(use_second_read=True)
             if self._state.exit_status is None:
                 # autoserv exited without writing an exit code
                 # to the pidfile
@@ -1043,46 +880,33 @@
         """
         try:
             self._get_pidfile_info_helper()
-        except PidfileException, exc:
+        except self._PidfileException, exc:
             self._handle_pidfile_error('Pidfile error', traceback.format_exc())
 
 
-    def _handle_no_pid(self):
+    def _handle_no_process(self):
         """\
         Called when no pidfile is found or no pid is in the pidfile.
         """
-        # is autoserv running?
-        pid, args = self._find_autoserv_proc()
-        if pid is None:
-            # no autoserv process running
-            message = 'No pid found at ' + self.pid_file
-        else:
-            message = ("Process %d (%s) hasn't written pidfile %s" %
-                       (pid, args, self.pid_file))
-
+        message = 'No pid found at %s' % self.pidfile_id
         print message
-        if time.time() - self.start_time > PIDFILE_TIMEOUT:
-            email_manager.enqueue_notify_email(
+        if time.time() - self._start_time > PIDFILE_TIMEOUT:
+            email_manager.manager.enqueue_notify_email(
                 'Process has failed to write pidfile', message)
-            if pid is not None:
-                kill_autoserv(pid)
-            else:
-                pid = 0
-            self.on_lost_process(pid)
-            return
+            self.on_lost_process(_drone_manager.get_dummy_process())
 
 
-    def on_lost_process(self, pid):
+    def on_lost_process(self, process):
         """\
         Called when autoserv has exited without writing an exit status,
         or we've timed out waiting for autoserv to write a pid to the
         pidfile.  In either case, we just return failure and the caller
         should signal some kind of warning.
 
-        pid is unimportant here, as it shouldn't be used by anyone.
+        process is unimportant here, as it shouldn't be used by anyone.
         """
         self.lost_process = True
-        self._state.pid = pid
+        self._state.process = process
         self._state.exit_status = 1
         self._state.num_tests_failed = 0
 
@@ -1099,17 +923,24 @@
 
 
 class Agent(object):
-    def __init__(self, tasks, queue_entry_ids=[], num_processes=1):
+    def __init__(self, tasks, num_processes=1):
         self.active_task = None
         self.queue = Queue.Queue(0)
         self.dispatcher = None
-        self.queue_entry_ids = queue_entry_ids
         self.num_processes = num_processes
 
+        self.queue_entry_ids = self._union_ids(task.queue_entry_ids
+                                               for task in tasks)
+        self.host_ids = self._union_ids(task.host_ids for task in tasks)
+
         for task in tasks:
             self.add_task(task)
 
 
+    def _union_ids(self, id_lists):
+        return set(itertools.chain(*id_lists))
+
+
     def add_task(self, task):
         self.queue.put_nowait(task)
         task.agent = self
@@ -1160,19 +991,31 @@
 
 
 class AgentTask(object):
-    def __init__(self, cmd, failure_tasks = []):
+    def __init__(self, cmd, working_directory=None, failure_tasks=[]):
         self.done = False
         self.failure_tasks = failure_tasks
         self.started = False
         self.cmd = cmd
+        self._working_directory = working_directory
         self.task = None
         self.agent = None
         self.monitor = None
         self.success = None
+        self.queue_entry_ids = []
+        self.host_ids = []
+        self.log_file = None
+
+
+    def _set_ids(self, host=None, queue_entries=None):
+        if queue_entries and queue_entries != [None]:
+            self.host_ids = [entry.host.id for entry in queue_entries]
+            self.queue_entry_ids = [entry.id for entry in queue_entries]
+        else:
+            assert host
+            self.host_ids = [host.id]
 
 
     def poll(self):
-        print "poll"
         if self.monitor:
             self.tick(self.monitor.exit_code())
         else:
@@ -1180,9 +1023,8 @@
 
 
     def tick(self, exit_code):
-        if exit_code==None:
+        if exit_code is None:
             return
-#               print "exit_code was %d" % exit_code
         if exit_code == 0:
             success = True
         else:
@@ -1206,13 +1048,13 @@
 
 
     def create_temp_resultsdir(self, suffix=''):
-        self.temp_results_dir = tempfile.mkdtemp(suffix=suffix)
+        self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
 
 
     def cleanup(self):
-        if (hasattr(self, 'temp_results_dir') and
-            os.path.exists(self.temp_results_dir)):
-            shutil.rmtree(self.temp_results_dir)
+        if self.monitor and self.log_file:
+            _drone_manager.copy_to_results_repository(
+                self.monitor.get_process(), self.log_file)
 
 
     def epilog(self):
@@ -1236,35 +1078,40 @@
         self.cleanup()
 
 
+    def set_host_log_file(self, base_name, host):
+        filename = '%s.%s' % (time.time(), base_name)
+        self.log_file = os.path.join('hosts', host.hostname, filename)
+
+
     def run(self):
         if self.cmd:
-            print "agent starting monitor"
-            log_file = None
-            if hasattr(self, 'log_file'):
-                log_file = self.log_file
-            elif hasattr(self, 'host'):
-                log_file = os.path.join(RESULTS_DIR, 'hosts',
-                                        self.host.hostname)
-            self.monitor = RunMonitor(
-                self.cmd, nice_level=AUTOSERV_NICE_LEVEL, log_file=log_file)
-            self.monitor.run()
+            self.monitor = PidfileRunMonitor()
+            self.monitor.run(self.cmd, self._working_directory,
+                             nice_level=AUTOSERV_NICE_LEVEL,
+                             log_file=self.log_file)
 
 
 class RepairTask(AgentTask):
     def __init__(self, host, queue_entry=None):
         """\
-        fail_queue_entry: queue entry to mark failed if this repair
-        fails.
+        queue_entry: queue entry to mark failed if this repair fails.
         """
         protection = host_protections.Protection.get_string(host.protection)
         # normalize the protection name
         protection = host_protections.Protection.get_attr_name(protection)
-        self.create_temp_resultsdir('.repair')
-        cmd = [_autoserv_path , '-R', '-m', host.hostname,
-               '-r', self.temp_results_dir, '--host-protection', protection]
+
         self.host = host
         self.queue_entry = queue_entry
-        super(RepairTask, self).__init__(cmd)
+        self._set_ids(host=host, queue_entries=[queue_entry])
+
+        self.create_temp_resultsdir('.repair')
+        cmd = [_autoserv_path , '-p', '-R', '-m', host.hostname,
+               '-r', _drone_manager.absolute_path(self.temp_results_dir),
+               '--host-protection', protection]
+        super(RepairTask, self).__init__(cmd, self.temp_results_dir)
+
+        self._set_ids(host=host, queue_entries=[queue_entry])
+        self.set_host_log_file('repair', self.host)
 
 
     def prolog(self):
@@ -1285,66 +1132,34 @@
 
 
 class PreJobTask(AgentTask):
-    def prolog(self):
-        super(PreJobTask, self).prolog()
-        if self.queue_entry:
-            # clear any possibly existing results, could be a previously failed
-            # verify or a previous execution that crashed
-            self.queue_entry.clear_results_dir()
-
-
-    def cleanup(self):
-        if not os.path.exists(self.temp_results_dir):
-            return
+    def epilog(self):
+        super(PreJobTask, self).epilog()
         should_copy_results = (self.queue_entry and not self.success
                                and not self.queue_entry.meta_host)
         if should_copy_results:
             self.queue_entry.set_execution_subdir()
-            self._move_results()
-        super(PreJobTask, self).cleanup()
-
-
-    def _move_results(self):
-        assert self.queue_entry is not None
-        target_dir = self.queue_entry.results_dir()
-        ensure_directory_exists(target_dir)
-        files = os.listdir(self.temp_results_dir)
-        for filename in files:
-            if filename == AUTOSERV_PID_FILE:
-                continue
-            self._force_move(os.path.join(self.temp_results_dir, filename),
-                            os.path.join(target_dir, filename))
-
-
-    @staticmethod
-    def _force_move(source, dest):
-        """\
-        Replacement for shutil.move() that will delete the destination
-        if it exists, even if it's a directory.
-        """
-        if os.path.exists(dest):
-            warning = 'Warning: removing existing destination file ' + dest
-            print warning
-            email_manager.enqueue_notify_email(warning, warning)
-            remove_file_or_dir(dest)
-        shutil.move(source, dest)
+            destination = os.path.join(self.queue_entry.execution_tag(),
+                                       os.path.basename(self.log_file))
+            _drone_manager.copy_to_results_repository(
+                self.monitor.get_process(), self.log_file,
+                destination_path=destination)
 
 
 class VerifyTask(PreJobTask):
     def __init__(self, queue_entry=None, host=None):
         assert bool(queue_entry) != bool(host)
-
         self.host = host or queue_entry.host
         self.queue_entry = queue_entry
 
         self.create_temp_resultsdir('.verify')
-
-        cmd = [_autoserv_path, '-v', '-m', self.host.hostname, '-r',
-               self.temp_results_dir]
-
+        cmd = [_autoserv_path, '-p', '-v', '-m', self.host.hostname, '-r',
+               _drone_manager.absolute_path(self.temp_results_dir)]
         failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
+        super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
+                                         failure_tasks=failure_tasks)
 
-        super(VerifyTask, self).__init__(cmd, failure_tasks=failure_tasks)
+        self.set_host_log_file('verify', self.host)
+        self._set_ids(host=host, queue_entries=[queue_entry])
 
 
     def prolog(self):
@@ -1368,56 +1183,44 @@
 
 class QueueTask(AgentTask):
     def __init__(self, job, queue_entries, cmd):
-        super(QueueTask, self).__init__(cmd)
         self.job = job
         self.queue_entries = queue_entries
+        super(QueueTask, self).__init__(cmd, self._execution_tag())
+        self._set_ids(queue_entries=queue_entries)
 
 
-    @staticmethod
-    def _write_keyval(keyval_dir, field, value, keyval_filename='keyval'):
-        key_path = os.path.join(keyval_dir, keyval_filename)
-        keyval_file = open(key_path, 'a')
-        print >> keyval_file, '%s=%s' % (field, str(value))
-        keyval_file.close()
+    def _format_keyval(self, key, value):
+        return '%s=%s' % (key, value)
 
 
-    def _host_keyval_dir(self):
-        return os.path.join(self.results_dir(), 'host_keyvals')
+    def _write_keyval(self, field, value):
+        keyval_path = os.path.join(self._execution_tag(), 'keyval')
+        assert self.monitor and self.monitor.has_process()
+        paired_with_pidfile = self.monitor.pidfile_id
+        _drone_manager.write_lines_to_file(
+            keyval_path, [self._format_keyval(field, value)],
+            paired_with_pidfile=paired_with_pidfile)
 
 
-    def _write_host_keyval(self, host):
-        labels = ','.join(host.labels())
-        self._write_keyval(self._host_keyval_dir(), 'labels', labels,
-                           keyval_filename=host.hostname)
-
-    def _create_host_keyval_dir(self):
-        directory = self._host_keyval_dir()
-        ensure_directory_exists(directory)
+    def _write_host_keyvals(self, host):
+        keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
+                                   host.hostname)
+        platform, all_labels = host.platform_and_labels()
+        keyvals = dict(platform=platform, labels=','.join(all_labels))
+        keyval_content = '\n'.join(self._format_keyval(key, value)
+                                   for key, value in keyvals.iteritems())
+        _drone_manager.attach_file_to_execution(self._execution_tag(),
+                                                keyval_content,
+                                                file_path=keyval_path)
 
 
-    def results_dir(self):
-        return self.queue_entries[0].results_dir()
-
-
-    def run(self):
-        """\
-        Override AgentTask.run() so we can use a PidfileRunMonitor.
-        """
-        self.monitor = PidfileRunMonitor(self.results_dir(),
-                                         cmd=self.cmd,
-                                         nice_level=AUTOSERV_NICE_LEVEL)
-        self.monitor.run()
+    def _execution_tag(self):
+        return self.queue_entries[0].execution_tag()
 
 
     def prolog(self):
-        # write some job timestamps into the job keyval file
-        queued = time.mktime(self.job.created_on.timetuple())
-        started = time.time()
-        self._write_keyval(self.results_dir(), "job_queued", int(queued))
-        self._write_keyval(self.results_dir(), "job_started", int(started))
-        self._create_host_keyval_dir()
         for queue_entry in self.queue_entries:
-            self._write_host_keyval(queue_entry.host)
+            self._write_host_keyvals(queue_entry.host)
             queue_entry.set_status('Running')
             queue_entry.host.set_status('Running')
             queue_entry.host.update_field('dirty', 1)
@@ -1427,22 +1230,30 @@
 
 
     def _finish_task(self, success):
-        # write out the finished time into the results keyval
+        queued = time.mktime(self.job.created_on.timetuple())
         finished = time.time()
-        self._write_keyval(self.results_dir(), "job_finished", int(finished))
+        self._write_keyval("job_queued", int(queued))
+        self._write_keyval("job_finished", int(finished))
+
+        _drone_manager.copy_to_results_repository(self.monitor.get_process(),
+                                                  self._execution_tag() + '/')
 
         # parse the results of the job
         reparse_task = FinalReparseTask(self.queue_entries)
-        self.agent.dispatcher.add_agent(Agent([reparse_task]))
+        self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
 
 
     def _write_status_comment(self, comment):
-        status_log = open(os.path.join(self.results_dir(), 'status.log'), 'a')
-        status_log.write('INFO\t----\t----\t' + comment)
-        status_log.close()
+        _drone_manager.write_lines_to_file(
+            os.path.join(self._execution_tag(), 'status.log'),
+            ['INFO\t----\t----\t' + comment],
+            paired_with_pidfile=self.monitor.pidfile_id)
 
 
     def _log_abort(self):
+        if not self.monitor or not self.monitor.has_process():
+            return
+
         # build up sets of all the aborted_by and aborted_on values
         aborted_by, aborted_on = set(), set()
         for queue_entry in self.queue_entries:
@@ -1459,9 +1270,10 @@
         else:
             aborted_by_value = 'autotest_system'
             aborted_on_value = int(time.time())
-        results_dir = self.results_dir()
-        self._write_keyval(results_dir, "aborted_by", aborted_by_value)
-        self._write_keyval(results_dir, "aborted_on", aborted_on_value)
+
+        self._write_keyval("aborted_by", aborted_by_value)
+        self._write_keyval("aborted_on", aborted_on_value)
+
         aborted_on_string = str(datetime.datetime.fromtimestamp(
             aborted_on_value))
         self._write_status_comment('Job aborted by %s on %s' %
@@ -1495,10 +1307,6 @@
 
     def epilog(self):
         super(QueueTask, self).epilog()
-        for queue_entry in self.queue_entries:
-            # set status to PARSING here so queue entry is marked complete
-            queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
-
         self._finish_task(self.success)
         self._reboot_hosts()
 
@@ -1507,8 +1315,7 @@
 
 class RecoveryQueueTask(QueueTask):
     def __init__(self, job, queue_entries, run_monitor):
-        super(RecoveryQueueTask, self).__init__(job,
-                                         queue_entries, cmd=None)
+        super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
         self.run_monitor = run_monitor
 
 
@@ -1526,14 +1333,18 @@
         assert bool(host) ^ bool(queue_entry)
         if queue_entry:
             host = queue_entry.get_host()
-
-        self.create_temp_resultsdir('.cleanup')
-        self.cmd = [_autoserv_path, '--cleanup', '-m', host.hostname,
-                    '-r', self.temp_results_dir]
         self.queue_entry = queue_entry
         self.host = host
+
+        self.create_temp_resultsdir('.cleanup')
+        self.cmd = [_autoserv_path, '-p', '--cleanup', '-m', host.hostname,
+                    '-r', _drone_manager.absolute_path(self.temp_results_dir)]
         repair_task = RepairTask(host, queue_entry=queue_entry)
-        super(CleanupTask, self).__init__(self.cmd, failure_tasks=[repair_task])
+        super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
+                                          failure_tasks=[repair_task])
+
+        self._set_ids(host=host, queue_entries=[queue_entry])
+        self.set_host_log_file('cleanup', self.host)
 
 
     def prolog(self):
@@ -1551,9 +1362,11 @@
 
 class AbortTask(AgentTask):
     def __init__(self, queue_entry, agents_to_abort):
-        self.queue_entry = queue_entry
-        self.agents_to_abort = agents_to_abort
         super(AbortTask, self).__init__('')
+        self.queue_entry = queue_entry
+        # don't use _set_ids, since we don't want to set the host_ids
+        self.queue_entry_ids = [queue_entry.id]
+        self.agents_to_abort = agents_to_abort
 
 
     def prolog(self):
@@ -1574,27 +1387,33 @@
 
 
 class FinalReparseTask(AgentTask):
-    MAX_PARSE_PROCESSES = (
-        global_config.global_config.get_config_value(
-            _global_config_section, 'max_parse_processes', type=int))
+    MAX_PARSE_PROCESSES = global_config.global_config.get_config_value(
+            CONFIG_SECTION, 'max_parse_processes', type=int)
     _num_running_parses = 0
 
     def __init__(self, queue_entries):
         self._queue_entries = queue_entries
+        # don't use _set_ids, since we don't want to set the host_ids
+        self.queue_entry_ids = [entry.id for entry in queue_entries]
         self._parse_started = False
 
         assert len(queue_entries) > 0
         queue_entry = queue_entries[0]
 
+        self._execution_tag = queue_entry.execution_tag()
+        self._results_dir = _drone_manager.absolute_path(self._execution_tag)
+        self._autoserv_monitor = PidfileRunMonitor()
+        self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
+        self._final_status = self._determine_final_status()
+
         if _testing_mode:
             self.cmd = 'true'
-            return
+        else:
+            super(FinalReparseTask, self).__init__(
+                cmd=self._generate_parse_command(),
+                working_directory=self._execution_tag)
 
-        self._results_dir = queue_entry.results_dir()
-        self.log_file = os.path.abspath(os.path.join(self._results_dir,
-                                                     '.parse.log'))
-        super(FinalReparseTask, self).__init__(
-            cmd=self._generate_parse_command())
+        self.log_file = os.path.join(self._execution_tag, '.parse.log')
 
 
     @classmethod
@@ -1612,6 +1431,13 @@
         return cls._num_running_parses < cls.MAX_PARSE_PROCESSES
 
 
+    def _determine_final_status(self):
+        # we'll use a PidfileRunMonitor to read the autoserv exit status
+        if self._autoserv_monitor.exit_code() == 0:
+            return models.HostQueueEntry.Status.COMPLETED
+        return models.HostQueueEntry.Status.FAILED
+
+
     def prolog(self):
         super(FinalReparseTask, self).prolog()
         for queue_entry in self._queue_entries:
@@ -1620,22 +1446,13 @@
 
     def epilog(self):
         super(FinalReparseTask, self).epilog()
-        final_status = self._determine_final_status()
         for queue_entry in self._queue_entries:
-            queue_entry.set_status(final_status)
-
-
-    def _determine_final_status(self):
-        # use a PidfileRunMonitor to read the autoserv exit status
-        monitor = PidfileRunMonitor(self._results_dir)
-        if monitor.exit_code() == 0:
-            return models.HostQueueEntry.Status.COMPLETED
-        return models.HostQueueEntry.Status.FAILED
+            queue_entry.set_status(self._final_status)
 
 
     def _generate_parse_command(self):
-        parse = os.path.abspath(os.path.join(AUTOTEST_TKO_DIR, 'parse'))
-        return [parse, '-l', '2', '-r', '-o', self._results_dir]
+        return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
+                self._results_dir]
 
 
     def poll(self):
@@ -1655,8 +1472,14 @@
     def _try_starting_parse(self):
         if not self._can_run_new_parse():
             return
+
         # actually run the parse command
-        super(FinalReparseTask, self).run()
+        self.monitor = PidfileRunMonitor()
+        self.monitor.run(self.cmd, self._working_directory,
+                         log_file=self.log_file,
+                         pidfile_name='.parser_execute',
+                         paired_with_pidfile=self._autoserv_monitor.pidfile_id)
+
         self._increment_running_parses()
         self._parse_started = True
 
@@ -1846,19 +1669,32 @@
         self.update_field('status',status)
 
 
-    def labels(self):
+    def platform_and_labels(self):
         """
-        Fetch a list of names of all non-platform labels associated with this
-        host.
+        Returns a tuple (platform_name, list_of_all_label_names).
         """
         rows = _db.execute("""
-                SELECT labels.name
+                SELECT labels.name, labels.platform
                 FROM labels
                 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
-                WHERE NOT labels.platform AND hosts_labels.host_id = %s
+                WHERE hosts_labels.host_id = %s
                 ORDER BY labels.name
                 """, (self.id,))
-        return [row[0] for row in rows]
+        platform = None
+        all_labels = []
+        for label_name, is_platform in rows:
+            if is_platform:
+                platform = label_name
+            all_labels.append(label_name)
+        return platform, all_labels
+
+
+    def reverify_tasks(self):
+        cleanup_task = CleanupTask(host=self)
+        verify_task = VerifyTask(host=self)
+        # just to make sure this host does not get taken away
+        self.set_status('Cleaning')
+        return [cleanup_task, verify_task]
 
 
 class HostQueueEntry(DBObject):
@@ -1872,7 +1708,7 @@
         else:
             self.host = None
 
-        self.queue_log_path = os.path.join(self.job.results_dir(),
+        self.queue_log_path = os.path.join(self.job.tag(),
                                            'queue.log.' + str(self.id))
 
 
@@ -1911,9 +1747,8 @@
 
     def queue_log_record(self, log_line):
         now = str(datetime.datetime.now())
-        queue_log = open(self.queue_log_path, 'a', 0)
-        queue_log.write(now + ' ' + log_line + '\n')
-        queue_log.close()
+        _drone_manager.write_lines_to_file(self.queue_log_path,
+                                           [now + ' ' + log_line])
 
 
     def block_host(self, host_id):
@@ -1931,10 +1766,6 @@
             block.delete()
 
 
-    def results_dir(self):
-        return os.path.join(self.job.job_dir, self.execution_subdir)
-
-
     def set_execution_subdir(self, subdir=None):
         if subdir is None:
             assert self.get_host()
@@ -1948,6 +1779,10 @@
         return 'no host'
 
 
+    def __str__(self):
+        return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
+
+
     def set_status(self, status):
         abort_statuses = ['Abort', 'Aborting', 'Aborted']
         if status not in abort_statuses:
@@ -1957,8 +1792,7 @@
             condition = ''
         self.update_field('status', status, condition=condition)
 
-        print "%s/%d (%d) -> %s" % (self._get_hostname(), self.job.id, self.id,
-                                    self.status)
+        print "%s -> %s" % (self, self.status)
 
         if status in ['Queued', 'Parsing']:
             self.update_field('complete', False)
@@ -1989,7 +1823,7 @@
         body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
                 self.job.id, self.job.name, hostname, status,
                 self._view_job_url())
-        send_email(self.job.email_list, subject, body)
+        email_manager.manager.send_email(self.job.email_list, subject, body)
 
 
     def _email_on_job_complete(self):
@@ -2014,14 +1848,13 @@
         body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
                 self.job.id, self.job.name, status,  self._view_job_url(),
                 summary_text)
-        send_email(self.job.email_list, subject, body)
+        email_manager.manager.send_email(self.job.email_list, subject, body)
 
 
     def run(self,assigned_host=None):
         if self.meta_host:
             assert assigned_host
             # ensure results dir exists for the queue log
-            self.job.create_results_dir()
             self.set_host(assigned_host)
 
         print "%s/%s scheduled on %s, status=%s" % (self.job.name,
@@ -2031,7 +1864,6 @@
 
     def requeue(self):
         self.set_status('Queued')
-
         if self.meta_host:
             self.set_host(None)
 
@@ -2046,24 +1878,6 @@
         self.job.stop_if_necessary()
 
 
-    def clear_results_dir(self, dont_delete_files=False):
-        if not self.execution_subdir:
-            return
-        results_dir = self.results_dir()
-        if not os.path.exists(results_dir):
-            return
-        if dont_delete_files:
-            temp_dir = tempfile.mkdtemp(suffix='.clear_results')
-            print 'Moving results from %s to %s' % (results_dir, temp_dir)
-        for filename in os.listdir(results_dir):
-            path = os.path.join(results_dir, filename)
-            if dont_delete_files:
-                shutil.move(path, os.path.join(temp_dir, filename))
-            else:
-                remove_file_or_dir(path)
-        remove_file_or_dir(results_dir)
-
-
     @property
     def aborted_by(self):
         self._load_abort_info()
@@ -2107,20 +1921,18 @@
         return None
 
 
-    def abort(self, agents_to_abort=[]):
-        abort_task = AbortTask(self, agents_to_abort)
-        tasks = [abort_task]
-
+    def abort(self, dispatcher, agents_to_abort=[]):
         host = self.get_host()
         if self.active and host:
-            cleanup_task = CleanupTask(host=host)
-            verify_task = VerifyTask(host=host)
-            # just to make sure this host does not get taken away
-            host.set_status('Cleaning')
-            tasks += [cleanup_task, verify_task]
+            dispatcher.add_agent(Agent(tasks=host.reverify_tasks()))
 
+        abort_task = AbortTask(self, agents_to_abort)
         self.set_status('Aborting')
-        return Agent(tasks=tasks, queue_entry_ids=[self.id])
+        dispatcher.add_agent(Agent(tasks=[abort_task], num_processes=0))
+
+    def execution_tag(self):
+        assert self.execution_subdir
+        return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
 
 
 class Job(DBObject):
@@ -2128,9 +1940,6 @@
         assert id or row
         super(Job, self).__init__(id=id, row=row)
 
-        self.job_dir = os.path.join(RESULTS_DIR, "%s-%s" % (self.id,
-                                                            self.owner))
-
 
     @classmethod
     def _get_table(cls):
@@ -2148,6 +1957,10 @@
         return self.control_type != 2
 
 
+    def tag(self):
+        return "%s-%s" % (self.id, self.owner)
+
+
     def get_host_queue_entries(self):
         rows = _db.execute("""
                 SELECT * FROM host_queue_entries
@@ -2174,9 +1987,6 @@
         return (pending_entries.count() >= self.synch_count)
 
 
-    def results_dir(self):
-        return self.job_dir
-
     def num_machines(self, clause = None):
         sql = "job_id=%s" % self.id
         if clause:
@@ -2224,25 +2034,8 @@
 
     def write_to_machines_file(self, queue_entry):
         hostname = queue_entry.get_host().hostname
-        print "writing %s to job %s machines file" % (hostname, self.id)
-        file_path = os.path.join(self.job_dir, '.machines')
-        mf = open(file_path, 'a')
-        mf.write(hostname + '\n')
-        mf.close()
-
-
-    def create_results_dir(self, queue_entry=None):
-        ensure_directory_exists(self.job_dir)
-
-        if queue_entry:
-            results_dir = queue_entry.results_dir()
-            if os.path.exists(results_dir):
-                warning = 'QE results dir ' + results_dir + ' already exists'
-                print warning
-                email_manager.enqueue_notify_email(warning, warning)
-            ensure_directory_exists(results_dir)
-            return results_dir
-        return self.job_dir
+        file_path = os.path.join(self.tag(), '.machines')
+        _drone_manager.write_lines_to_file(file_path, [hostname])
 
 
     def _next_group_name(self):
@@ -2258,14 +2051,10 @@
         return "group%d" % next_id
 
 
-    def _write_control_file(self):
-        'Writes control file out to disk, returns a filename'
-        control_fd, control_filename = tempfile.mkstemp(suffix='.control_file')
-        control_file = os.fdopen(control_fd, 'w')
-        if self.control_file:
-            control_file.write(self.control_file)
-        control_file.close()
-        return control_filename
+    def _write_control_file(self, execution_tag):
+        control_path = _drone_manager.attach_file_to_execution(
+                execution_tag, self.control_file)
+        return control_path
 
 
     def get_group_entries(self, queue_entry_from_group):
@@ -2275,23 +2064,17 @@
             params=(self.id, execution_subdir)))
 
 
-    def get_job_tag(self, queue_entries):
-        assert len(queue_entries) > 0
-        execution_subdir = queue_entries[0].execution_subdir
-        assert execution_subdir
-        return "%s-%s/%s" % (self.id, self.owner, execution_subdir)
-
-
     def _get_autoserv_params(self, queue_entries):
-        results_dir = self.create_results_dir(queue_entries[0])
-        control_filename = self._write_control_file()
+        assert queue_entries
+        execution_tag = queue_entries[0].execution_tag()
+        control_path = self._write_control_file(execution_tag)
         hostnames = ','.join([entry.get_host().hostname
                               for entry in queue_entries])
-        job_tag = self.get_job_tag(queue_entries)
 
-        params = [_autoserv_path, '-P', job_tag, '-p', '-n',
-                  '-r', os.path.abspath(results_dir), '-u', self.owner,
-                  '-l', self.name, '-m', hostnames, control_filename]
+        params = [_autoserv_path, '-P', execution_tag, '-p', '-n',
+                  '-r', _drone_manager.absolute_path(execution_tag),
+                  '-u', self.owner, '-l', self.name, '-m', hostnames,
+                  _drone_manager.absolute_path(control_path)]
 
         if not self.is_server_job():
             params.append('-c')
@@ -2344,8 +2127,7 @@
         if not self.is_ready():
             if self.run_verify:
                 queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
-                return Agent(self._get_pre_job_tasks(queue_entry),
-                             [queue_entry.id])
+                return Agent(self._get_pre_job_tasks(queue_entry))
             else:
                 return queue_entry.on_pending()
 
@@ -2362,7 +2144,7 @@
         tasks = initial_tasks + [queue_task]
         entry_ids = [entry.id for entry in queue_entries]
 
-        return Agent(tasks, entry_ids, num_processes=len(queue_entries))
+        return Agent(tasks, num_processes=len(queue_entries))
 
 
 if __name__ == '__main__':
diff --git a/scheduler/monitor_db_unittest.py b/scheduler/monitor_db_unittest.py
index 059d1ee..2dff024 100644
--- a/scheduler/monitor_db_unittest.py
+++ b/scheduler/monitor_db_unittest.py
@@ -10,12 +10,32 @@
 from autotest_lib.database import database_connection, migrate
 from autotest_lib.frontend import thread_local
 from autotest_lib.frontend.afe import models
-from autotest_lib.scheduler import monitor_db
+from autotest_lib.scheduler import monitor_db, drone_manager, email_manager
 
 _DEBUG = False
 
-class Dummy(object):
-    'Dummy object that can have attribute assigned to it'
+class DummyAgent(object):
+    _is_running = False
+    _is_done = False
+    num_processes = 1
+    host_ids = []
+    queue_entry_ids = []
+
+    def is_running(self):
+        return self._is_running
+
+
+    def tick(self):
+        self._is_running = True
+
+
+    def is_done(self):
+        return self._is_done
+
+
+    def set_done(self, done):
+        self._is_done = done
+        self._is_running = not done
 
 
 class IsRow(mock.argument_comparator):
@@ -67,6 +87,8 @@
 
     def _set_monitor_stubs(self):
         monitor_db._db = self._database
+        monitor_db._drone_manager._results_dir = '/test/path'
+        monitor_db._drone_manager._temporary_directory = '/test/path/tmp'
 
 
     def _fill_in_test_data(self):
@@ -158,7 +180,7 @@
             else:
                 host = hqe_self.host
             self._record_job_scheduled(hqe_self.job.id, host.id)
-            return Dummy()
+            return DummyAgent()
         monitor_db.HostQueueEntry.run = run_stub
 
 
@@ -401,30 +423,8 @@
         self._dispatcher.max_processes_started_per_cycle = self._MAX_STARTED
 
 
-    class DummyAgent(object):
-        _is_running = False
-        _is_done = False
-        num_processes = 1
-
-        def is_running(self):
-            return self._is_running
-
-
-        def tick(self):
-            self._is_running = True
-
-
-        def is_done(self):
-            return self._is_done
-
-
-        def set_done(self, done):
-            self._is_done = done
-            self._is_running = not done
-
-
     def _setup_some_agents(self, num_agents):
-        self._agents = [self.DummyAgent() for i in xrange(num_agents)]
+        self._agents = [DummyAgent() for i in xrange(num_agents)]
         self._dispatcher._agents = list(self._agents)
 
 
@@ -494,33 +494,39 @@
     """
     Test the dispatcher abort functionality.
     """
-    def _check_agent(self, agent, entry_and_host_id, include_host_tasks):
+    def _check_abort_agent(self, agent, entry_id):
         self.assert_(isinstance(agent, monitor_db.Agent))
         tasks = list(agent.queue.queue)
-        self.assert_(len(tasks) > 0)
-
+        self.assertEquals(len(tasks), 1)
         abort = tasks[0]
+
         self.assert_(isinstance(abort, monitor_db.AbortTask))
-        self.assertEquals(abort.queue_entry.id, entry_and_host_id)
+        self.assertEquals(abort.queue_entry.id, entry_id)
 
-        if not include_host_tasks:
-            self.assertEquals(len(tasks), 1)
-            return
 
-        self.assertEquals(len(tasks), 3)
-        cleanup, verify = tasks[1:]
+    def _check_host_agent(self, agent, host_id):
+        self.assert_(isinstance(agent, monitor_db.Agent))
+        tasks = list(agent.queue.queue)
+        self.assertEquals(len(tasks), 2)
+        cleanup, verify = tasks
 
         self.assert_(isinstance(cleanup, monitor_db.CleanupTask))
-        self.assertEquals(cleanup.host.id, entry_and_host_id)
+        self.assertEquals(cleanup.host.id, host_id)
 
         self.assert_(isinstance(verify, monitor_db.VerifyTask))
-        self.assertEquals(verify.host.id, entry_and_host_id)
+        self.assertEquals(verify.host.id, host_id)
 
 
     def _check_agents(self, agents, include_host_tasks):
+        agents = list(agents)
+        if include_host_tasks:
+            self.assertEquals(len(agents), 4)
+            self._check_host_agent(agents.pop(0), 1)
+            self._check_host_agent(agents.pop(1), 2)
+
         self.assertEquals(len(agents), 2)
-        for index, agent in enumerate(agents):
-            self._check_agent(agent, index + 1, include_host_tasks)
+        self._check_abort_agent(agents[0], 1)
+        self._check_abort_agent(agents[1], 2)
 
 
     def test_find_aborting_inactive(self):
@@ -538,7 +544,7 @@
         self._update_hqe(set='status="Abort", active=1')
         # have to make an Agent for the active HQEs
         agent = self.god.create_mock_class(monitor_db.Agent, 'old_agent')
-        agent.queue_entry_ids = [1, 2]
+        agent.host_ids = agent.queue_entry_ids = [1, 2]
         self._dispatcher.add_agent(agent)
 
         self._dispatcher._find_aborting()
@@ -547,9 +553,9 @@
         self.god.check_playback()
 
         # ensure agent gets aborted
-        abort1 = self._dispatcher._agents[0].queue.queue[0]
+        abort1 = self._dispatcher._agents[1].queue.queue[0]
         self.assertEquals(abort1.agents_to_abort, [agent])
-        abort2 = self._dispatcher._agents[1].queue.queue[0]
+        abort2 = self._dispatcher._agents[3].queue.queue[0]
         self.assertEquals(abort2.agents_to_abort, [])
 
 
@@ -596,61 +602,81 @@
 
 
 class PidfileRunMonitorTest(unittest.TestCase):
-    results_dir = '/test/path'
-    pidfile_path = os.path.join(results_dir, monitor_db.AUTOSERV_PID_FILE)
+    execution_tag = 'test_tag'
     pid = 12345
+    process = drone_manager.Process('myhost', pid)
     num_tests_failed = 1
-    args = ('nice -n 10 autoserv -P 123-myuser/myhost -p -n '
-            '-r ' + results_dir + ' -b -u myuser -l my-job-name '
-            '-m myhost /tmp/filejx43Zi -c')
-    bad_args = args.replace(results_dir, '/random/results/dir')
 
     def setUp(self):
         self.god = mock.mock_god()
-        self.god.stub_function(monitor_db, 'open')
-        self.god.stub_function(os.path, 'exists')
-        self.god.stub_function(monitor_db.email_manager,
-                               'enqueue_notify_email')
-        self.monitor = monitor_db.PidfileRunMonitor(self.results_dir)
+        self.mock_drone_manager = self.god.create_mock_class(
+            drone_manager.DroneManager, 'drone_manager')
+        self.god.stub_with(monitor_db, '_drone_manager',
+                           self.mock_drone_manager)
+        self.god.stub_function(email_manager.manager, 'enqueue_notify_email')
+
+        self.pidfile_id = object()
+
+        self.mock_drone_manager.get_pidfile_id_from.expect_call(
+            self.execution_tag).and_return(self.pidfile_id)
+        self.mock_drone_manager.register_pidfile.expect_call(self.pidfile_id)
+
+        self.monitor = monitor_db.PidfileRunMonitor()
+        self.monitor.attach_to_existing_process(self.execution_tag)
 
 
     def tearDown(self):
         self.god.unstub_all()
 
 
+    def setup_pidfile(self, pid=None, exit_code=None, tests_failed=None,
+                      use_second_read=False):
+        contents = drone_manager.PidfileContents()
+        if pid is not None:
+            contents.process = drone_manager.Process('myhost', pid)
+        contents.exit_status = exit_code
+        contents.num_tests_failed = tests_failed
+        self.mock_drone_manager.get_pidfile_contents.expect_call(
+            self.pidfile_id, use_second_read=use_second_read).and_return(
+            contents)
+
+
     def set_not_yet_run(self):
-        os.path.exists.expect_call(self.pidfile_path).and_return(False)
-
-
-    def setup_pidfile(self, pidfile_contents):
-        os.path.exists.expect_call(self.pidfile_path).and_return(True)
-        pidfile = StringIO.StringIO(pidfile_contents)
-        monitor_db.open.expect_call(
-            self.pidfile_path, 'r').and_return(pidfile)
+        self.setup_pidfile()
 
 
     def set_empty_pidfile(self):
-        self.setup_pidfile('')
+        self.setup_pidfile()
 
 
-    def set_running(self):
-        self.setup_pidfile(str(self.pid) + '\n')
+    def set_running(self, use_second_read=False):
+        self.setup_pidfile(self.pid, use_second_read=use_second_read)
 
 
-    def set_complete(self, error_code):
-        self.setup_pidfile(str(self.pid) + '\n' +
-                           str(error_code) + '\n' +
-                           str(self.num_tests_failed))
+    def set_complete(self, error_code, use_second_read=False):
+        self.setup_pidfile(self.pid, error_code, self.num_tests_failed,
+                           use_second_read=use_second_read)
+
+
+    def _check_monitor(self, expected_pid, expected_exit_status,
+                       expected_num_tests_failed):
+        if expected_pid is None:
+            self.assertEquals(self.monitor._state.process, None)
+        else:
+            self.assertEquals(self.monitor._state.process.pid, expected_pid)
+        self.assertEquals(self.monitor._state.exit_status, expected_exit_status)
+        self.assertEquals(self.monitor._state.num_tests_failed,
+                          expected_num_tests_failed)
+
+
+        self.god.check_playback()
 
 
     def _test_read_pidfile_helper(self, expected_pid, expected_exit_status,
                                   expected_num_tests_failed):
         self.monitor._read_pidfile()
-        self.assertEquals(self.monitor._state.pid, expected_pid)
-        self.assertEquals(self.monitor._state.exit_status, expected_exit_status)
-        self.assertEquals(self.monitor._state.num_tests_failed,
-                          expected_num_tests_failed)
-        self.god.check_playback()
+        self._check_monitor(expected_pid, expected_exit_status,
+                            expected_num_tests_failed)
 
 
     def _get_expected_tests_failed(self, expected_exit_status):
@@ -676,34 +702,24 @@
 
 
     def test_read_pidfile_error(self):
-        self.setup_pidfile('asdf')
-        self.assertRaises(monitor_db.PidfileException,
+        self.mock_drone_manager.get_pidfile_contents.expect_call(
+            self.pidfile_id, use_second_read=False).and_return(
+            drone_manager.InvalidPidfile('error'))
+        self.assertRaises(monitor_db.PidfileRunMonitor._PidfileException,
                           self.monitor._read_pidfile)
         self.god.check_playback()
 
 
-    def setup_proc_cmdline(self, args):
-        proc_cmdline = args.replace(' ', '\x00')
-        proc_file = StringIO.StringIO(proc_cmdline)
-        monitor_db.open.expect_call(
-            '/proc/%d/cmdline' % self.pid, 'r').and_return(proc_file)
-
-
-    def setup_find_autoservs(self, process_dict):
-        self.god.stub_class_method(monitor_db.Dispatcher,
-                                   'find_autoservs')
-        monitor_db.Dispatcher.find_autoservs.expect_call().and_return(
-            process_dict)
+    def setup_is_running(self, is_running):
+        self.mock_drone_manager.is_process_running.expect_call(
+            self.process).and_return(is_running)
 
 
     def _test_get_pidfile_info_helper(self, expected_pid, expected_exit_status,
                                       expected_num_tests_failed):
         self.monitor._get_pidfile_info()
-        self.assertEquals(self.monitor._state.pid, expected_pid)
-        self.assertEquals(self.monitor._state.exit_status, expected_exit_status)
-        self.assertEquals(self.monitor._state.num_tests_failed,
-                          expected_num_tests_failed)
-        self.god.check_playback()
+        self._check_monitor(expected_pid, expected_exit_status,
+                            expected_num_tests_failed)
 
 
     def test_get_pidfile_info(self):
@@ -712,14 +728,13 @@
         """
         # running
         self.set_running()
-        self.setup_proc_cmdline(self.args)
+        self.setup_is_running(True)
         self._test_get_pidfile_info_helper(self.pid, None, None)
 
         # exited during check
         self.set_running()
-        monitor_db.open.expect_call(
-            '/proc/%d/cmdline' % self.pid, 'r').and_raises(IOError)
-        self.set_complete(123) # pidfile gets read again
+        self.setup_is_running(False)
+        self.set_complete(123, use_second_read=True) # pidfile gets read again
         self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed)
 
         # completed
@@ -733,10 +748,9 @@
         """
         # running but no proc
         self.set_running()
-        monitor_db.open.expect_call(
-            '/proc/%d/cmdline' % self.pid, 'r').and_raises(IOError)
-        self.set_running()
-        monitor_db.email_manager.enqueue_notify_email.expect_call(
+        self.setup_is_running(False)
+        self.set_running(use_second_read=True)
+        email_manager.manager.enqueue_notify_email.expect_call(
             mock.is_string_comparator(), mock.is_string_comparator())
         self._test_get_pidfile_info_helper(self.pid, 1, 0)
         self.assertTrue(self.monitor.lost_process)
@@ -746,20 +760,19 @@
         """
         pidfile hasn't been written yet
         """
-        # process not running
         self.set_not_yet_run()
-        self.setup_find_autoservs({})
         self._test_get_pidfile_info_helper(None, None, None)
 
-        # process running
-        self.set_not_yet_run()
-        self.setup_find_autoservs({self.pid : self.args})
-        self._test_get_pidfile_info_helper(None, None, None)
 
-        # another process running under same pid
+    def test_process_failed_to_write_pidfile(self):
         self.set_not_yet_run()
-        self.setup_find_autoservs({self.pid : self.bad_args})
-        self._test_get_pidfile_info_helper(None, None, None)
+        email_manager.manager.enqueue_notify_email.expect_call(
+            mock.is_string_comparator(), mock.is_string_comparator())
+        dummy_process = drone_manager.Process('dummy', 12345)
+        self.mock_drone_manager.get_dummy_process.expect_call().and_return(
+            dummy_process)
+        self.monitor._start_time = time.time() - monitor_db.PIDFILE_TIMEOUT - 1
+        self._test_get_pidfile_info_helper(12345, 1, 0)
 
 
 class AgentTest(unittest.TestCase):
@@ -771,13 +784,16 @@
         self.god.unstub_all()
 
 
+    def _create_mock_task(self, name):
+        task = self.god.create_mock_class(monitor_db.AgentTask, name)
+        task.host_ids = task.queue_entry_ids = []
+        return task
+
+
     def test_agent(self):
-        task1 = self.god.create_mock_class(monitor_db.AgentTask,
-                                          'task1')
-        task2 = self.god.create_mock_class(monitor_db.AgentTask,
-                                          'task2')
-        task3 = self.god.create_mock_class(monitor_db.AgentTask,
-                                           'task3')
+        task1 = self._create_mock_task('task1')
+        task2 = self._create_mock_task('task2')
+        task3 = self._create_mock_task('task3')
 
         task1.start.expect_call()
         task1.is_done.expect_call().and_return(False)
@@ -806,27 +822,39 @@
 
 
 class AgentTasksTest(unittest.TestCase):
-    TEMP_DIR = '/temp/dir'
+    TEMP_DIR = '/abspath/tempdir'
     RESULTS_DIR = '/results/dir'
     HOSTNAME = 'myhost'
+    DUMMY_PROCESS = object()
     HOST_PROTECTION = host_protections.default
+    PIDFILE_ID = object()
 
     def setUp(self):
         self.god = mock.mock_god()
-        self.god.stub_with(tempfile, 'mkdtemp',
-                           mock.mock_function('mkdtemp', self.TEMP_DIR))
-        self.god.stub_with(os.path, 'exists',
-                           mock.mock_function('exists', True))
-        self.god.stub_with(shutil, 'rmtree', mock.mock_function('rmtree', None))
-        self.god.stub_class_method(monitor_db.RunMonitor, 'run')
-        self.god.stub_class_method(monitor_db.RunMonitor, 'exit_code')
-        self.god.stub_class_method(monitor_db.PreJobTask, '_move_results')
+        self.god.stub_with(drone_manager.DroneManager, 'get_temporary_path',
+                           mock.mock_function('get_temporary_path',
+                                              default_return_val='tempdir'))
+        self.god.stub_function(drone_manager.DroneManager,
+                               'copy_to_results_repository')
+        self.god.stub_function(drone_manager.DroneManager,
+                               'get_pidfile_id_from')
+
+        def dummy_absolute_path(self, path):
+            return '/abspath/' + path
+        self.god.stub_with(drone_manager.DroneManager, 'absolute_path',
+                           dummy_absolute_path)
+
+        self.god.stub_class_method(monitor_db.PidfileRunMonitor, 'run')
+        self.god.stub_class_method(monitor_db.PidfileRunMonitor, 'exit_code')
+        self.god.stub_class_method(monitor_db.PidfileRunMonitor, 'get_process')
         self.host = self.god.create_mock_class(monitor_db.Host, 'host')
+        self.host.id = 1
         self.host.hostname = self.HOSTNAME
         self.host.protection = self.HOST_PROTECTION
         self.queue_entry = self.god.create_mock_class(
             monitor_db.HostQueueEntry, 'queue_entry')
         self.job = self.god.create_mock_class(monitor_db.Job, 'job')
+        self.queue_entry.id = 1
         self.queue_entry.job = self.job
         self.queue_entry.host = self.host
         self.queue_entry.meta_host = None
@@ -857,12 +885,31 @@
         self.assertEquals(task.success, success)
 
 
-    def setup_run_monitor(self, exit_status):
-        monitor_db.RunMonitor.run.expect_call()
-        monitor_db.RunMonitor.exit_code.expect_call()
-        monitor_db.RunMonitor.exit_code.expect_call().and_return(
+    def setup_run_monitor(self, exit_status, copy_log_file=True):
+        monitor_db.PidfileRunMonitor.run.expect_call(
+            mock.is_instance_comparator(list),
+            'tempdir',
+            nice_level=monitor_db.AUTOSERV_NICE_LEVEL,
+            log_file=mock.anything_comparator())
+        monitor_db.PidfileRunMonitor.exit_code.expect_call()
+        monitor_db.PidfileRunMonitor.exit_code.expect_call().and_return(
             exit_status)
 
+        if copy_log_file:
+            self._setup_move_logfile()
+
+
+    def _setup_move_logfile(self, include_destination=False):
+        monitor_db.PidfileRunMonitor.get_process.expect_call().and_return(
+            self.DUMMY_PROCESS)
+        if include_destination:
+            drone_manager.DroneManager.copy_to_results_repository.expect_call(
+                self.DUMMY_PROCESS, mock.is_string_comparator(),
+                destination_path=mock.is_string_comparator())
+        else:
+            drone_manager.DroneManager.copy_to_results_repository.expect_call(
+                self.DUMMY_PROCESS, mock.is_string_comparator())
+
 
     def _test_repair_task_helper(self, success):
         self.host.set_status.expect_call('Repairing')
@@ -882,10 +929,10 @@
         expected_protection = host_protections.Protection.get_attr_name(
             expected_protection)
 
-        self.assertTrue(set(task.monitor.cmd) >=
-                        set(['autoserv', '-R', '-m', self.HOSTNAME, '-r',
-                             self.TEMP_DIR, '--host-protection',
-                             expected_protection]))
+        self.assertTrue(set(task.cmd) >=
+                        set([monitor_db._autoserv_path, '-p', '-R', '-m',
+                             self.HOSTNAME, '-r', self.TEMP_DIR,
+                             '--host-protection', expected_protection]))
         self.god.check_playback()
 
 
@@ -908,7 +955,6 @@
 
     def setup_verify_expects(self, success, use_queue_entry):
         if use_queue_entry:
-            self.queue_entry.clear_results_dir.expect_call()
             self.queue_entry.set_status.expect_call('Verifying')
         self.host.set_status.expect_call('Verifying')
         if success:
@@ -920,7 +966,8 @@
             self.setup_run_monitor(1)
             if use_queue_entry and not self.queue_entry.meta_host:
                 self.queue_entry.set_execution_subdir.expect_call()
-                monitor_db.VerifyTask._move_results.expect_call()
+                self.queue_entry.execution_tag.expect_call().and_return('tag')
+                self._setup_move_logfile(include_destination=True)
 
 
     def _check_verify_failure_tasks(self, verify_task):
@@ -939,15 +986,14 @@
         self.setup_verify_expects(success, use_queue_entry)
 
         if use_queue_entry:
-            task = monitor_db.VerifyTask(
-                queue_entry=self.queue_entry)
+            task = monitor_db.VerifyTask(queue_entry=self.queue_entry)
         else:
             task = monitor_db.VerifyTask(host=self.host)
         self._check_verify_failure_tasks(task)
         self.run_task(task, success)
-        self.assertTrue(set(task.monitor.cmd) >=
-                        set(['autoserv', '-v', '-m', self.HOSTNAME, '-r',
-                        self.TEMP_DIR]))
+        self.assertTrue(set(task.cmd) >=
+                        set([monitor_db._autoserv_path, '-p', '-v', '-m',
+                             self.HOSTNAME, '-r', self.TEMP_DIR]))
         self.god.check_playback()
 
 
@@ -969,6 +1015,7 @@
     def test_abort_task(self):
         queue_entry = self.god.create_mock_class(monitor_db.HostQueueEntry,
                                                  'queue_entry')
+        queue_entry.id = 1
         queue_entry.host_id, queue_entry.job_id = 1, 2
         task = self.god.create_mock_class(monitor_db.AgentTask, 'task')
         agent = self.god.create_mock_class(monitor_db.Agent, 'agent')
@@ -982,45 +1029,61 @@
         self.god.check_playback()
 
 
-    def _setup_pre_parse_expects(self, is_synch, num_machines):
-        self.queue_entry.results_dir.expect_call().and_return(self.RESULTS_DIR)
+    def _setup_pre_parse_expects(self, autoserv_success):
+        self.queue_entry.execution_tag.expect_call().and_return('tag')
+        self.pidfile_monitor = monitor_db.PidfileRunMonitor.expect_new()
+        self.pidfile_monitor.pidfile_id = self.PIDFILE_ID
+        self.pidfile_monitor.attach_to_existing_process.expect_call('tag')
+        if autoserv_success:
+            code = 0
+        else:
+            code = 1
+        self.pidfile_monitor.exit_code.expect_call().and_return(code)
+
         self.queue_entry.set_status.expect_call('Parsing')
 
 
     def _setup_post_parse_expects(self, autoserv_success):
-        pidfile_monitor = monitor_db.PidfileRunMonitor.expect_new(
-            self.RESULTS_DIR)
         if autoserv_success:
-            code, status = 0, 'Completed'
+            status = 'Completed'
         else:
-            code, status = 1, 'Failed'
-        pidfile_monitor.exit_code.expect_call().and_return(code)
+            status = 'Failed'
         self.queue_entry.set_status.expect_call(status)
 
 
-    def _test_final_reparse_task_helper(self, is_synch=False, num_machines=1,
-                                        autoserv_success=True):
-        tko_dir = '/tko/dir'
-        monitor_db.AUTOTEST_TKO_DIR = tko_dir
-        parse_path = os.path.join(tko_dir, 'parse')
+    def setup_reparse_run_monitor(self):
+        autoserv_pidfile_id = object()
+        monitor = monitor_db.PidfileRunMonitor.expect_new()
+        monitor.run.expect_call(
+            mock.is_instance_comparator(list),
+            'tag',
+            log_file=mock.anything_comparator(),
+            pidfile_name='.parser_execute',
+            paired_with_pidfile=self.PIDFILE_ID)
+        monitor.exit_code.expect_call()
+        monitor.exit_code.expect_call().and_return(0)
+        monitor.get_process.expect_call().and_return(self.DUMMY_PROCESS)
+        drone_manager.DroneManager.copy_to_results_repository.expect_call(
+                self.DUMMY_PROCESS, mock.is_string_comparator())
 
-        self._setup_pre_parse_expects(is_synch, num_machines)
-        self.setup_run_monitor(0)
+
+    def _test_final_reparse_task_helper(self, autoserv_success=True):
+        self._setup_pre_parse_expects(autoserv_success)
+        self.setup_reparse_run_monitor()
         self._setup_post_parse_expects(autoserv_success)
 
         task = monitor_db.FinalReparseTask([self.queue_entry])
         self.run_task(task, True)
 
         self.god.check_playback()
-        cmd = [parse_path, '-l', '2', '-r', '-o', self.RESULTS_DIR]
+        cmd = [monitor_db._parser_path, '--write-pidfile', '-l', '2', '-r',
+               '-o', '/abspath/tag']
         self.assertEquals(task.cmd, cmd)
 
 
     def test_final_reparse_task(self):
         self.god.stub_class(monitor_db, 'PidfileRunMonitor')
         self._test_final_reparse_task_helper()
-        self._test_final_reparse_task_helper(num_machines=2)
-        self._test_final_reparse_task_helper(is_synch=True)
         self._test_final_reparse_task_helper(autoserv_success=False)
 
 
@@ -1029,12 +1092,12 @@
         self.god.stub_function(monitor_db.FinalReparseTask,
                                '_can_run_new_parse')
 
-        self._setup_pre_parse_expects(False, 1)
+        self._setup_pre_parse_expects(True)
         monitor_db.FinalReparseTask._can_run_new_parse.expect_call().and_return(
             False)
         monitor_db.FinalReparseTask._can_run_new_parse.expect_call().and_return(
             True)
-        self.setup_run_monitor(0)
+        self.setup_reparse_run_monitor()
         self._setup_post_parse_expects(True)
 
         task = monitor_db.FinalReparseTask([self.queue_entry])
@@ -1045,7 +1108,6 @@
     def _test_cleanup_task_helper(self, success, use_queue_entry=False):
         if use_queue_entry:
             self.queue_entry.get_host.expect_call().and_return(self.host)
-            self.queue_entry.clear_results_dir.expect_call()
         self.host.set_status.expect_call('Cleaning')
         if success:
             self.setup_run_monitor(0)
@@ -1055,7 +1117,8 @@
             self.setup_run_monitor(1)
             if use_queue_entry and not self.queue_entry.meta_host:
                 self.queue_entry.set_execution_subdir.expect_call()
-                monitor_db.VerifyTask._move_results.expect_call()
+                self.queue_entry.execution_tag.expect_call().and_return('tag')
+                self._setup_move_logfile(include_destination=True)
 
         if use_queue_entry:
             task = monitor_db.CleanupTask(queue_entry=self.queue_entry)
@@ -1070,9 +1133,9 @@
         self.run_task(task, success)
 
         self.god.check_playback()
-        self.assert_(set(task.monitor.cmd) >=
-                        set(['autoserv', '--cleanup', '-m', self.HOSTNAME,
-                             '-r', self.TEMP_DIR]))
+        self.assert_(set(task.cmd) >=
+                        set([monitor_db._autoserv_path, '-p', '--cleanup', '-m',
+                             self.HOSTNAME, '-r', self.TEMP_DIR]))
 
     def test_cleanup_task(self):
         self._test_cleanup_task_helper(True)
@@ -1086,16 +1149,15 @@
 class JobTest(BaseSchedulerTest):
     def setUp(self):
         super(JobTest, self).setUp()
-        self.god.stub_function(os.path, 'exists')
-        self.god.stub_function(monitor_db, 'ensure_directory_exists')
+        self.god.stub_with(
+            drone_manager.DroneManager, 'attach_file_to_execution',
+            mock.mock_function('attach_file_to_execution',
+                               default_return_val='/test/path/tmp/foo'))
 
 
     def _setup_directory_expects(self, execution_subdir):
         job_path = os.path.join('.', '1-my_user')
         results_dir = os.path.join(job_path, execution_subdir)
-        monitor_db.ensure_directory_exists.expect_call(job_path)
-        os.path.exists.expect_call(results_dir)
-        monitor_db.ensure_directory_exists.expect_call(results_dir)
 
 
     def _test_run_helper(self, expect_agent=True, expect_starting=False,
diff --git a/server/hosts/abstract_ssh.py b/server/hosts/abstract_ssh.py
index 9a5b38f..c6707d1 100644
--- a/server/hosts/abstract_ssh.py
+++ b/server/hosts/abstract_ssh.py
@@ -1,5 +1,5 @@
 import os, sys, time, types, socket, traceback, shutil
-from autotest_lib.client.common_lib import error
+from autotest_lib.client.common_lib import error, debug
 from autotest_lib.server import utils, autotest
 from autotest_lib.server.hosts import site_host
 
@@ -41,7 +41,8 @@
         format (%s@%s:%s).
         """
 
-        print '_copy_files: copying %s to %s' % (sources, dest)
+        debug.get_logger().debug('_copy_files: copying %s to %s' %
+                                 (sources, dest))
         try:
             ssh = make_ssh_command(self.user, self.port)
             if delete_dest:
@@ -289,3 +290,12 @@
                 self.machine_install()
             except NotImplementedError, e:
                 sys.stderr.write(str(e) + "\n\n")
+
+
+class LoggerFile(object):
+    def write(self, data):
+        debug.get_logger().debug(data)
+
+
+    def flush(self):
+        pass
diff --git a/server/hosts/paramiko_host.py b/server/hosts/paramiko_host.py
index 68643a5..4912881 100644
--- a/server/hosts/paramiko_host.py
+++ b/server/hosts/paramiko_host.py
@@ -101,8 +101,8 @@
         """
 
         # tee to std* if no tees are provided
-        stdout = stdout_tee or sys.stdout
-        stderr = stderr_tee or sys.stdout
+        stdout = stdout_tee or abstract_ssh.LoggerFile()
+        stderr = stderr_tee or abstract_ssh.LoggerFile()
         self.host_log.debug("ssh-paramiko: %s" % command)
 
         # start up the command
diff --git a/server/hosts/ssh_host.py b/server/hosts/ssh_host.py
index e839b3e..4887512 100644
--- a/server/hosts/ssh_host.py
+++ b/server/hosts/ssh_host.py
@@ -63,7 +63,7 @@
 
 
     def _run(self, command, timeout, ignore_status, stdout, stderr,
-             connect_timeout, env, options):
+             connect_timeout, env, options, stdin=None):
         """Helper function for run()."""
         ssh_cmd = self.ssh_command(connect_timeout, options)
         echo_cmd = "echo \`date '+%m/%d/%y %H:%M:%S'\` Connected. >&2"
@@ -74,7 +74,7 @@
         full_cmd = '%s "%s;%s %s"' % (ssh_cmd, echo_cmd, env,
                                       utils.sh_escape(command))
         result = utils.run(full_cmd, timeout, True, stdout, stderr,
-                           verbose=False)
+                           verbose=False, stdin=stdin)
 
         # The error messages will show up in band (indistinguishable
         # from stuff sent through the SSH connection), so we have the
@@ -96,7 +96,8 @@
 
 
     def run(self, command, timeout=3600, ignore_status=False,
-            stdout_tee=None, stderr_tee=None, connect_timeout=30, options=''):
+            stdout_tee=None, stderr_tee=None, connect_timeout=30, options='',
+            stdin=None):
         """
         Run a command on the remote host.
 
@@ -108,6 +109,7 @@
                      to complete if it has to kill the process.
             ignore_status: do not raise an exception, no matter
                      what the exit code of the command is.
+            stdin: stdin to pass to the executed process
 
         Returns:
             a utils.CmdResult object
@@ -117,20 +119,22 @@
                               execution was not 0
             AutoservSSHTimeout: ssh connection has timed out
         """
-        stdout = stdout_tee or sys.stdout
-        stderr = stderr_tee or sys.stdout
+        stdout = stdout_tee or abstract_ssh.LoggerFile()
+        stderr = stderr_tee or abstract_ssh.LoggerFile()
         self.ssh_host_log.debug("ssh: %s" % command)
         env = " ".join("=".join(pair) for pair in self.env.iteritems())
         try:
             try:
                 return self._run(command, timeout, ignore_status, stdout,
-                                 stderr, connect_timeout, env, options)
+                                 stderr, connect_timeout, env, options,
+                                 stdin=stdin)
             except PermissionDeniedError:
-                print("Permission denied to ssh; re-running"
-                      "with increased logging:")
+                print >>stdout, ("Permission denied to ssh; re-running"
+                                 "with increased logging:")
                 try:
                     self._run(command, timeout, ignore_status, stdout,
-                                 stderr, connect_timeout, env, '-v -v -v')
+                              stderr, connect_timeout, env, '-v -v -v',
+                              stdin=stdin)
                 except Exception:
                     pass
                 raise
diff --git a/server/server_job.py b/server/server_job.py
index 51fbe2b..5580b66 100755
--- a/server/server_job.py
+++ b/server/server_job.py
@@ -154,7 +154,8 @@
             os.unlink(self.status)
         job_data = {'label' : label, 'user' : user,
                     'hostname' : ','.join(machines),
-                    'status_version' : str(self.STATUS_VERSION)}
+                    'status_version' : str(self.STATUS_VERSION),
+                    'job_started' : str(int(time.time()))}
         if self.resultdir:
             job_data.update(get_site_job_data(self))
             utils.write_keyval(self.resultdir, job_data)
diff --git a/server/utils.py b/server/utils.py
index 3e80430..d5991f5 100644
--- a/server/utils.py
+++ b/server/utils.py
@@ -24,9 +24,9 @@
 
 ############# we need pass throughs for the methods in client/common_lib/utils
 def run(command, timeout=None, ignore_status=False,
-        stdout_tee=None, stderr_tee=None, verbose=True):
+        stdout_tee=None, stderr_tee=None, verbose=True, stdin=None):
     return utils.run(command, timeout, ignore_status,
-                     stdout_tee, stderr_tee, verbose)
+                     stdout_tee, stderr_tee, verbose, stdin=stdin)
 
 
 def system(command, timeout=None, ignore_status=False):
diff --git a/tko/parse b/tko/parse
index 8379db3..6854e95 100755
--- a/tko/parse
+++ b/tko/parse
@@ -1,9 +1,5 @@
 #!/usr/bin/python -u
 
-import os, sys
-
-# this is a stub that just execs into parse.py
-mypath = os.path.dirname(os.path.abspath(__file__))
-parse_py = os.path.join(mypath, "parse.py")
-
-os.execv(parse_py, [parse_py] + sys.argv[1:])
+import common
+from autotest_lib.tko import parse
+parse.main()
diff --git a/tko/retrieve_logs.cgi b/tko/retrieve_logs.cgi
index 157bf8a..5cb3a22 100755
--- a/tko/retrieve_logs.cgi
+++ b/tko/retrieve_logs.cgi
@@ -1,6 +1,8 @@
 #!/usr/bin/python
 
-import cgi, os, sys
+import cgi, os, sys, urllib2
+import common
+from autotest_lib.client.common_lib import global_config
 
 page = """\
 Status: 302 Found
@@ -16,12 +18,12 @@
 
 # Define function for retrieving logs
 try:
-	import site_retrieve_logs
-	retrieve_logs = site_retrieve_logs.retrieve_logs
-	del site_retrieve_logs
+    import site_retrieve_logs
+    retrieve_logs = site_retrieve_logs.retrieve_logs
+    del site_retrieve_logs
 except ImportError:
-	def retrieve_logs(job_path):
-		pass
+    def retrieve_logs(job_path):
+        pass
 
 # Get form fields
 form = cgi.FieldStorage(keep_blank_values=True)
@@ -30,12 +32,27 @@
 job_path = os.path.join(autodir, job_path)
 keyval = retrieve_logs(job_path)
 
-# Redirect to results page
-testname = ''
-if 'test' in form:
-	testname = form['test'].value
-	full_path = os.path.join(job_path, form['test'].value)
-	if not os.path.exists(full_path):
-		testname = ''
-path = "%s%s" % (form['job'].value, testname)
-print page % path
+
+def find_repo(job_path):
+    """Find the machine holding the given logs and return a URL to the logs"""
+    config = global_config.global_config
+    drones = config.get_config_value('SCHEDULER', 'drones')
+    results_host = config.get_config_value('SCHEDULER', 'results_host')
+    if drones and results_host and results_host != 'localhost':
+        drone_list = [hostname.strip() for hostname in drones.split(',')]
+        results_repos = [results_host] + drone_list
+        for drone in results_repos:
+            http_path = 'http://%s%s' % (drone, form['job'].value)
+            try:
+                urllib2.urlopen(http_path).read()
+                return http_path
+            except urllib2.URLError:
+                pass
+        # just return the results repo if we haven't found any
+        return 'http://%s%s' % (results_host, form['job'].value)
+    else:
+    # Local
+        return form['job'].value
+
+
+print page % find_repo(job_path)