[autotest] Scheduler refactor.

Break scheduler into simpler modules.
This change also modifies run_pylint to check for undefined variables.

BUG=chromium:312338
TEST=Ran smoke suite against multiple duts.
     Triggered agents like repair, verify etc. Pylint, Unittests.
DEPLOY=scheduler

Change-Id: Ibd685a27b5b50abd26cdf2976ac4189c3e9acc0a
Reviewed-on: https://chromium-review.googlesource.com/174080
Commit-Queue: Prashanth B <beeps@chromium.org>
Tested-by: Prashanth B <beeps@chromium.org>
Reviewed-by: Alex Miller <milleral@chromium.org>
diff --git a/scheduler/agent_task.py b/scheduler/agent_task.py
new file mode 100644
index 0000000..7cd28f6
--- /dev/null
+++ b/scheduler/agent_task.py
@@ -0,0 +1,633 @@
+#pylint: disable-msg=C0111
+
+""" This is the module for everything related to the AgentTask.
+
+The BaseAgentTask imposes an interface through which the scheduler can monitor
+a processes; Examples of such processes include Verify, Cleanup and the Queue
+Tasks that run the tests. The scheduler itself only understands Agents.
+Agents:
+    The Agent is the bridge between the scheduler and the AgentTask. The
+    schedulers tick has a method called handle_agents, which calls the
+    tick of each agent in the Dispatchers queue. This leads to the Agent
+    polling its AgentTask. The scheduler will keep polling a task through
+    the associated Agent till the Agent is removed from the dispatcher.
+
+    At a high level:
+        agents finished = tasks done
+        agent polls till finished
+            task polls till done
+                task sets done
+        agent is removed from dispatcher
+AgentTasks:
+    Basic AgentTasks are created when an hqe changes state. Examples of these
+    are the QueueTask, which is created when a hqe goes into the Starting state
+    and the FinalReparseTask, which is created when the hqe goes into parsing.
+SpecialAgentTasks:
+    Unlink AgentTasks, SpecialAgentTasks are only created when a row is inserted
+    in the afe_special_tasks table. All PrejobTasks are SpecialAgentTasks.
+
+Monitor_db.get_agent_task_for_special_task/get_agent_task_for_queue_entry maps
+an AgentTask to an Agent, which the scheduler understands. From this point
+onward, the scheduler manages the task through the Agents interface,as follows:
+At a high level:
+    task poll
+        start
+            prolog
+        tick till we get an exit code
+        finished(exit==0)
+            done=True
+            epilog
+                cleanup
+                    set is_active, is_complete, success (checked in scheduler)
+
+The first special task for an HQE is usually Reset.
+-poll: The first poll will start the task, polls thereafter will call the tasks
+       tick method. A started task will have the started bit set.
+- start: Call prolog, run the process and set the start bit.
+    - prolog: Usually where one puts any model state changes that happen before
+              the actual task. Different per Task. Examples of things that might
+              happen in a prolog:
+                  - state of Host, HQE (to something like Resetting)
+                  - delete any unwanted queued special tasks
+                  - register a pidfile
+                  - set the is_active bit on the special task
+    - run:
+        - create a PidfileRunMonitor
+        - pass the autoserv command, working directory etc to drone manager.
+          This will start the actual autoserv process.
+   - set the start bit: so subsequent polls do not 'start' again
+
+- tick: For as long as a started tasks done bit is not set, a poll will lead
+        to a tick. The tick monitors the pid file of the autoserv process
+        running on the drone through the PidfileRunMonitor created in prolog.
+        If the autoserv process has finished we call finished with true/false
+        depending on autoserv exit code.
+
+        - finished: sets the done and success values, then calls epilog. The
+                    done bit is important because the Agent polls this bit to
+                    measure the success or failure of its task.
+
+            - epilog: Is generally where we set status of the Host/HQE again,
+                      requeue any other task that needs to run after this one
+                      and perform cleanup. Just like the prolog, this step is
+                      different per task.
+
+                      - cleanup: Sets the is_active and is_complete and success
+                                 states on the tasks model. Also uses the
+                                 drone_manager to:
+                                    unregister the pidfile
+                                    copy results of the task
+                                 (Note this is not to be confused with the
+                                  special task called cleanup).
+
+                      The actions we take in the epilog are based on the
+                      success/failure of the autoserv process set in cleanup,
+                      eg: if reset failed we will enqueue a repair, but if all
+                      is well the epilog will just return. Prejob task epilogs
+                      also have an on_pending method that change the status of
+                      the HQE to pending/starting, which gets picked up in the
+                      scheduler.
+By this point the is_done flag is set, which results in the Agent noticing that
+the task has finished and unregistering it from the dispatcher.Class hierarchy:
+BaseAgentTask
+ |--->SpecialAgentTask (prejob_task.py)
+      |--->RepairTask
+      |--->PreJobTask
+           |--->Verify, Cleanup, Reset, Provision
+
+ |--->AbstractQueueTask (monitor_db.py)
+      |--->QueueTask
+      |--->HostlessQueueTask
+
+ |--->PostJobTask (postjob_task.py)
+      |--->GatherLogsTask
+      |--->SelfThrottledPostJobTask
+            |--->FinalReparseTask
+            |--->ArchiveResultsTask
+
+"""
+
+import logging
+import os
+import urllib
+import time
+
+from autotest_lib.frontend.afe import models
+from autotest_lib.scheduler import drone_manager, pidfile_monitor
+from autotest_lib.client.common_lib import utils
+from autotest_lib.scheduler import email_manager, host_scheduler
+from autotest_lib.scheduler import scheduler_models
+from autotest_lib.server import autoserv_utils
+
+
+_drone_manager = drone_manager.instance()
+
+AUTOSERV_NICE_LEVEL = 10
+
+
+class BaseAgentTask(object):
+    class _NullMonitor(object):
+        pidfile_id = None
+
+        def has_process(self):
+            return True
+
+
+    def __init__(self, log_file_name=None):
+        """
+        @param log_file_name: (optional) name of file to log command output to
+        """
+        self.done = False
+        self.started = False
+        self.success = None
+        self.aborted = False
+        self.monitor = None
+        self.queue_entry_ids = []
+        self.host_ids = []
+        self._log_file_name = log_file_name
+
+
+    def _set_ids(self, host=None, queue_entries=None):
+        if queue_entries and queue_entries != [None]:
+            self.host_ids = [entry.host.id for entry in queue_entries]
+            self.queue_entry_ids = [entry.id for entry in queue_entries]
+        else:
+            assert host
+            self.host_ids = [host.id]
+
+
+    def poll(self):
+        if not self.started:
+            self.start()
+        if not self.done:
+            self.tick()
+
+
+    def tick(self):
+        assert self.monitor
+        exit_code = self.monitor.exit_code()
+        if exit_code is None:
+            return
+
+        success = (exit_code == 0)
+        self.finished(success)
+
+
+    def is_done(self):
+        return self.done
+
+
+    def finished(self, success):
+        if self.done:
+            assert self.started
+            return
+        self.started = True
+        self.done = True
+        self.success = success
+        self.epilog()
+
+
+    def prolog(self):
+        """
+        To be overridden.
+        """
+        assert not self.monitor
+        self.register_necessary_pidfiles()
+
+
+    def _log_file(self):
+        if not self._log_file_name:
+            return None
+        return os.path.join(self._working_directory(), self._log_file_name)
+
+
+    def cleanup(self):
+        log_file = self._log_file()
+        if self.monitor and log_file:
+            self.monitor.try_copy_to_results_repository(log_file)
+
+
+    def epilog(self):
+        """
+        To be overridden.
+        """
+        self.cleanup()
+        logging.info("%s finished with success=%s", type(self).__name__,
+                     self.success)
+
+
+    def start(self):
+        if not self.started:
+            self.prolog()
+            self.run()
+
+        self.started = True
+
+
+    def abort(self):
+        if self.monitor:
+            self.monitor.kill()
+        self.done = True
+        self.aborted = True
+        self.cleanup()
+
+
+    def _get_consistent_execution_path(self, execution_entries):
+        first_execution_path = execution_entries[0].execution_path()
+        for execution_entry in execution_entries[1:]:
+            assert execution_entry.execution_path() == first_execution_path, (
+                '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
+                                        execution_entry,
+                                        first_execution_path,
+                                        execution_entries[0]))
+        return first_execution_path
+
+
+    def _copy_results(self, execution_entries, use_monitor=None):
+        """
+        @param execution_entries: list of objects with execution_path() method
+        """
+        if use_monitor is not None and not use_monitor.has_process():
+            return
+
+        assert len(execution_entries) > 0
+        if use_monitor is None:
+            assert self.monitor
+            use_monitor = self.monitor
+        assert use_monitor.has_process()
+        execution_path = self._get_consistent_execution_path(execution_entries)
+        results_path = execution_path + '/'
+        use_monitor.try_copy_to_results_repository(results_path)
+
+
+    def _parse_results(self, queue_entries):
+        for queue_entry in queue_entries:
+            queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
+
+
+    def _archive_results(self, queue_entries):
+        for queue_entry in queue_entries:
+            queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
+
+
+    def _command_line(self):
+        """
+        Return the command line to run.  Must be overridden.
+        """
+        raise NotImplementedError
+
+
+    @property
+    def num_processes(self):
+        """
+        Return the number of processes forked by this BaseAgentTask's process.
+        It may only be approximate.  To be overridden if necessary.
+        """
+        return 1
+
+
+    def _paired_with_monitor(self):
+        """
+        If this BaseAgentTask's process must run on the same machine as some
+        previous process, this method should be overridden to return a
+        PidfileRunMonitor for that process.
+        """
+        return self._NullMonitor()
+
+
+    @property
+    def owner_username(self):
+        """
+        Return login of user responsible for this task.  May be None.  Must be
+        overridden.
+        """
+        raise NotImplementedError
+
+
+    def _working_directory(self):
+        """
+        Return the directory where this BaseAgentTask's process executes.
+        Must be overridden.
+        """
+        raise NotImplementedError
+
+
+    def _pidfile_name(self):
+        """
+        Return the name of the pidfile this BaseAgentTask's process uses.  To be
+        overridden if necessary.
+        """
+        return drone_manager.AUTOSERV_PID_FILE
+
+
+    def _check_paired_results_exist(self):
+        if not self._paired_with_monitor().has_process():
+            email_manager.manager.enqueue_notify_email(
+                    'No paired results in task',
+                    'No paired results in task %s at %s'
+                    % (self, self._paired_with_monitor().pidfile_id))
+            self.finished(False)
+            return False
+        return True
+
+
+    def _create_monitor(self):
+        assert not self.monitor
+        self.monitor = pidfile_monitor.PidfileRunMonitor()
+
+
+    def run(self):
+        if not self._check_paired_results_exist():
+            return
+
+        self._create_monitor()
+        self.monitor.run(
+                self._command_line(), self._working_directory(),
+                num_processes=self.num_processes,
+                nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
+                pidfile_name=self._pidfile_name(),
+                paired_with_pidfile=self._paired_with_monitor().pidfile_id,
+                username=self.owner_username,
+                drone_hostnames_allowed=self.get_drone_hostnames_allowed())
+
+
+    def get_drone_hostnames_allowed(self):
+        if not models.DroneSet.drone_sets_enabled():
+            return None
+
+        hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
+        if not hqes:
+            # Only special tasks could be missing host queue entries
+            assert isinstance(self, SpecialAgentTask)
+            return self._user_or_global_default_drone_set(
+                    self.task, self.task.requested_by)
+
+        job_ids = hqes.values_list('job', flat=True).distinct()
+        assert job_ids.count() == 1, ("BaseAgentTask's queue entries "
+                                      "span multiple jobs")
+
+        job = models.Job.objects.get(id=job_ids[0])
+        drone_set = job.drone_set
+        if not drone_set:
+            return self._user_or_global_default_drone_set(job, job.user())
+
+        return drone_set.get_drone_hostnames()
+
+
+    def _user_or_global_default_drone_set(self, obj_with_owner, user):
+        """
+        Returns the user's default drone set, if present.
+
+        Otherwise, returns the global default drone set.
+        """
+        default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
+        if not user:
+            logging.warn('%s had no owner; using default drone set',
+                         obj_with_owner)
+            return default_hostnames
+        if not user.drone_set:
+            logging.warn('User %s has no default drone set, using global '
+                         'default', user.login)
+            return default_hostnames
+        return user.drone_set.get_drone_hostnames()
+
+
+    def register_necessary_pidfiles(self):
+        pidfile_id = _drone_manager.get_pidfile_id_from(
+                self._working_directory(), self._pidfile_name())
+        _drone_manager.register_pidfile(pidfile_id)
+
+        paired_pidfile_id = self._paired_with_monitor().pidfile_id
+        if paired_pidfile_id:
+            _drone_manager.register_pidfile(paired_pidfile_id)
+
+
+    def recover(self):
+        if not self._check_paired_results_exist():
+            return
+
+        self._create_monitor()
+        self.monitor.attach_to_existing_process(
+                self._working_directory(), pidfile_name=self._pidfile_name(),
+                num_processes=self.num_processes)
+        if not self.monitor.has_process():
+            # no process to recover; wait to be started normally
+            self.monitor = None
+            return
+
+        self.started = True
+        logging.info('Recovering process %s for %s at %s',
+                     self.monitor.get_process(), type(self).__name__,
+                     self._working_directory())
+
+
+    def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
+                                    allowed_host_statuses=None):
+        class_name = self.__class__.__name__
+        for entry in queue_entries:
+            if entry.status not in allowed_hqe_statuses:
+                raise host_scheduler.SchedulerError(
+                        '%s attempting to start entry with invalid status %s: '
+                        '%s' % (class_name, entry.status, entry))
+            invalid_host_status = (
+                    allowed_host_statuses is not None
+                    and entry.host.status not in allowed_host_statuses)
+            if invalid_host_status:
+                raise host_scheduler.SchedulerError(
+                        '%s attempting to start on queue entry with invalid '
+                        'host status %s: %s'
+                        % (class_name, entry.host.status, entry))
+
+
+SiteAgentTask = utils.import_site_class(
+    __file__, 'autotest_lib.scheduler.site_monitor_db',
+    'SiteAgentTask', BaseAgentTask)
+
+class AgentTask(SiteAgentTask):
+    pass
+
+
+class TaskWithJobKeyvals(object):
+    """AgentTask mixin providing functionality to help with job keyval files."""
+    _KEYVAL_FILE = 'keyval'
+    def _format_keyval(self, key, value):
+        return '%s=%s' % (key, value)
+
+
+    def _keyval_path(self):
+        """Subclasses must override this"""
+        raise NotImplementedError
+
+
+    def _write_keyval_after_job(self, field, value):
+        assert self.monitor
+        if not self.monitor.has_process():
+            return
+        _drone_manager.write_lines_to_file(
+            self._keyval_path(), [self._format_keyval(field, value)],
+            paired_with_process=self.monitor.get_process())
+
+
+    def _job_queued_keyval(self, job):
+        return 'job_queued', int(time.mktime(job.created_on.timetuple()))
+
+
+    def _write_job_finished(self):
+        self._write_keyval_after_job("job_finished", int(time.time()))
+
+
+    def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
+        keyval_contents = '\n'.join(self._format_keyval(key, value)
+                                    for key, value in keyval_dict.iteritems())
+        # always end with a newline to allow additional keyvals to be written
+        keyval_contents += '\n'
+        _drone_manager.attach_file_to_execution(self._working_directory(),
+                                                keyval_contents,
+                                                file_path=keyval_path)
+
+
+    def _write_keyvals_before_job(self, keyval_dict):
+        self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
+
+
+    def _write_host_keyvals(self, host):
+        keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
+                                   host.hostname)
+        platform, all_labels = host.platform_and_labels()
+        all_labels = [ urllib.quote(label) for label in all_labels ]
+        keyval_dict = dict(platform=platform, labels=','.join(all_labels))
+        self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
+
+
+class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
+    """
+    Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
+    """
+
+    TASK_TYPE = None
+    host = None
+    queue_entry = None
+
+    def __init__(self, task, extra_command_args):
+        super(SpecialAgentTask, self).__init__()
+
+        assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
+
+        self.host = scheduler_models.Host(id=task.host.id)
+        self.queue_entry = None
+        if task.queue_entry:
+            self.queue_entry = scheduler_models.HostQueueEntry(
+                    id=task.queue_entry.id)
+
+        self.task = task
+        self._extra_command_args = extra_command_args
+
+
+    def _keyval_path(self):
+        return os.path.join(self._working_directory(), self._KEYVAL_FILE)
+
+
+    def _command_line(self):
+        return autoserv_utils._autoserv_command_line(self.host.hostname,
+                                                     self._extra_command_args,
+                                                     queue_entry=self.queue_entry)
+
+
+    def _working_directory(self):
+        return self.task.execution_path()
+
+
+    @property
+    def owner_username(self):
+        if self.task.requested_by:
+            return self.task.requested_by.login
+        return None
+
+
+    def prolog(self):
+        super(SpecialAgentTask, self).prolog()
+        self.task.activate()
+        self._write_host_keyvals(self.host)
+
+
+    def _fail_queue_entry(self):
+        assert self.queue_entry
+
+        if self.queue_entry.meta_host:
+            return # don't fail metahost entries, they'll be reassigned
+
+        self.queue_entry.update_from_database()
+        if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
+            return # entry has been aborted
+
+        self._actually_fail_queue_entry()
+
+
+    # TODO(milleral): http://crbug.com/268607
+    # All this used to be a part of _fail_queue_entry.  The
+    # exact semantics of when one should and should not be failing a queue
+    # entry need to be worked out, because provisioning has placed us in a
+    # case where we want to fail a queue entry that could be requeued,
+    # which makes us fail the two above if statements, and thus
+    # _fail_queue_entry() would exit early and have no effect.
+    # What's left here with _actually_fail_queue_entry is a hack to be able to
+    # bypass the checks and unconditionally execute the code.
+    def _actually_fail_queue_entry(self):
+        self.queue_entry.set_execution_subdir()
+        queued_key, queued_time = self._job_queued_keyval(
+            self.queue_entry.job)
+        self._write_keyval_after_job(queued_key, queued_time)
+        self._write_job_finished()
+
+        # copy results logs into the normal place for job results
+        self.monitor.try_copy_results_on_drone(
+                source_path=self._working_directory() + '/',
+                destination_path=self.queue_entry.execution_path() + '/')
+
+        pidfile_id = _drone_manager.get_pidfile_id_from(
+                self.queue_entry.execution_path(),
+                pidfile_name=drone_manager.AUTOSERV_PID_FILE)
+        _drone_manager.register_pidfile(pidfile_id)
+
+        if self.queue_entry.job.parse_failed_repair:
+            self._parse_results([self.queue_entry])
+        else:
+            self._archive_results([self.queue_entry])
+
+        # Also fail all other special tasks that have not yet run for this HQE
+        pending_tasks = models.SpecialTask.objects.filter(
+                queue_entry__id=self.queue_entry.id,
+                is_complete=0)
+        for task in pending_tasks:
+            task.finish(False)
+
+
+    def cleanup(self):
+        super(SpecialAgentTask, self).cleanup()
+
+        # We will consider an aborted task to be "Failed"
+        self.task.finish(bool(self.success))
+
+        if self.monitor:
+            if self.monitor.has_process():
+                self._copy_results([self.task])
+            if self.monitor.pidfile_id is not None:
+                _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
+
+
+    def remove_special_tasks(self, special_task_to_remove, keep_last_one=False):
+        """Remove a type of special task in all tasks, keep last one if needed.
+
+        @param special_task_to_remove: type of special task to be removed, e.g.,
+            models.SpecialTask.Task.VERIFY.
+        @param keep_last_one: True to keep the last special task if its type is
+            the same as of special_task_to_remove.
+
+        """
+        queued_special_tasks = models.SpecialTask.objects.filter(
+            host__id=self.host.id,
+            task=special_task_to_remove,
+            is_active=False, is_complete=False, queue_entry=None)
+        if keep_last_one:
+            queued_special_tasks = queued_special_tasks.exclude(id=self.task.id)
+        queued_special_tasks.delete()
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index 22f087b..29dc2c1 100755
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -5,9 +5,8 @@
 Autotest scheduler
 """
 
-
 import datetime, optparse, os, signal
-import sys, time, traceback, urllib
+import sys, time
 import logging, gc
 
 import common
@@ -16,24 +15,22 @@
 import django.db
 
 from autotest_lib.client.common_lib import global_config, logging_manager
-from autotest_lib.client.common_lib import host_protections, utils
+from autotest_lib.client.common_lib import utils
 from autotest_lib.database import database_connection
-from autotest_lib.frontend.afe import model_attributes
 from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
-from autotest_lib.scheduler import drone_manager, drones, email_manager
-from autotest_lib.scheduler import gc_stats, host_scheduler, monitor_db_cleanup
-from autotest_lib.scheduler import scheduler_logging_config
+from autotest_lib.scheduler import agent_task, drone_manager, drones
+from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
+from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
+from autotest_lib.scheduler import postjob_task, scheduler_logging_config
 from autotest_lib.scheduler import scheduler_models
 from autotest_lib.scheduler import status_server, scheduler_config
 from autotest_lib.server import autoserv_utils
-from autotest_lib.server.cros import provision
 from autotest_lib.site_utils.graphite import stats
 
 BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
 PID_FILE_PREFIX = 'monitor_db'
 
 RESULTS_DIR = '.'
-AUTOSERV_NICE_LEVEL = 10
 DB_CONFIG_SECTION = 'AUTOTEST_WEB'
 AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
 
@@ -54,8 +51,10 @@
 
 _db = None
 _shutdown = False
-_autoserv_directory = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server')
-_autoserv_path = os.path.join(_autoserv_directory, 'autoserv')
+
+# These 2 globals are replaced for testing
+_autoserv_directory = autoserv_utils.autoserv_directory
+_autoserv_path = autoserv_utils.autoserv_path
 _testing_mode = False
 _drone_manager = None
 
@@ -67,13 +66,6 @@
 _parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
 
 
-def _get_pidfile_timeout_secs():
-    """@returns How long to wait for autoserv to write pidfile."""
-    pidfile_timeout_mins = global_config.global_config.get_config_value(
-            scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
-    return pidfile_timeout_mins * 60
-
-
 def _site_init_monitor_db_dummy():
     return {}
 
@@ -502,11 +494,11 @@
                 return HostlessQueueTask(queue_entry=queue_entry)
             return QueueTask(queue_entries=task_entries)
         if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
-            return GatherLogsTask(queue_entries=task_entries)
+            return postjob_task.GatherLogsTask(queue_entries=task_entries)
         if queue_entry.status == models.HostQueueEntry.Status.PARSING:
-            return FinalReparseTask(queue_entries=task_entries)
+            return postjob_task.FinalReparseTask(queue_entries=task_entries)
         if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
-            return ArchiveResultsTask(queue_entries=task_entries)
+            return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
 
         raise host_scheduler.SchedulerError(
                 '_get_agent_task_for_queue_entry got entry with '
@@ -552,8 +544,12 @@
         """
         self._assert_host_has_no_agent(special_task)
 
-        special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask,
-                                      ResetTask, ProvisionTask)
+        special_agent_task_classes = (prejob_task.CleanupTask,
+                                      prejob_task.VerifyTask,
+                                      prejob_task.RepairTask,
+                                      prejob_task.ResetTask,
+                                      prejob_task.ProvisionTask)
+
         for agent_task_class in special_agent_task_classes:
             if agent_task_class.TASK_TYPE == special_task.task:
                 return agent_task_class(task=special_task)
@@ -943,7 +939,7 @@
             # through the host and through the hqe. A special task
             # always needs a host, but doesn't always need a hqe.
             for agent in self._host_agents.get(task.host.id, []):
-                if isinstance(agent.task, SpecialAgentTask):
+                if isinstance(agent.task, agent_task.SpecialAgentTask):
 
                     # The epilog preforms critical actions such as
                     # queueing the next SpecialTask, requeuing the
@@ -1093,182 +1089,6 @@
     pass
 
 
-class PidfileRunMonitor(object):
-    """
-    Client must call either run() to start a new process or
-    attach_to_existing_process().
-    """
-
-    class _PidfileException(Exception):
-        """
-        Raised when there's some unexpected behavior with the pid file, but only
-        used internally (never allowed to escape this class).
-        """
-
-
-    def __init__(self):
-        self.lost_process = False
-        self._start_time = None
-        self.pidfile_id = None
-        self._state = drone_manager.PidfileContents()
-
-
-    def _add_nice_command(self, command, nice_level):
-        if not nice_level:
-            return command
-        return ['nice', '-n', str(nice_level)] + command
-
-
-    def _set_start_time(self):
-        self._start_time = time.time()
-
-
-    def run(self, command, working_directory, num_processes, nice_level=None,
-            log_file=None, pidfile_name=None, paired_with_pidfile=None,
-            username=None, drone_hostnames_allowed=None):
-        assert command is not None
-        if nice_level is not None:
-            command = ['nice', '-n', str(nice_level)] + command
-        self._set_start_time()
-        self.pidfile_id = _drone_manager.execute_command(
-            command, working_directory, pidfile_name=pidfile_name,
-            num_processes=num_processes, log_file=log_file,
-            paired_with_pidfile=paired_with_pidfile, username=username,
-            drone_hostnames_allowed=drone_hostnames_allowed)
-
-
-    def attach_to_existing_process(self, execution_path,
-                                   pidfile_name=drone_manager.AUTOSERV_PID_FILE,
-                                   num_processes=None):
-        self._set_start_time()
-        self.pidfile_id = _drone_manager.get_pidfile_id_from(
-            execution_path, pidfile_name=pidfile_name)
-        if num_processes is not None:
-            _drone_manager.declare_process_count(self.pidfile_id, num_processes)
-
-
-    def kill(self):
-        if self.has_process():
-            _drone_manager.kill_process(self.get_process())
-
-
-    def has_process(self):
-        self._get_pidfile_info()
-        return self._state.process is not None
-
-
-    def get_process(self):
-        self._get_pidfile_info()
-        assert self._state.process is not None
-        return self._state.process
-
-
-    def _read_pidfile(self, use_second_read=False):
-        assert self.pidfile_id is not None, (
-            'You must call run() or attach_to_existing_process()')
-        contents = _drone_manager.get_pidfile_contents(
-            self.pidfile_id, use_second_read=use_second_read)
-        if contents.is_invalid():
-            self._state = drone_manager.PidfileContents()
-            raise self._PidfileException(contents)
-        self._state = contents
-
-
-    def _handle_pidfile_error(self, error, message=''):
-        message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
-            self._state.process, self.pidfile_id, message)
-        email_manager.manager.enqueue_notify_email(error, message)
-        self.on_lost_process(self._state.process)
-
-
-    def _get_pidfile_info_helper(self):
-        if self.lost_process:
-            return
-
-        self._read_pidfile()
-
-        if self._state.process is None:
-            self._handle_no_process()
-            return
-
-        if self._state.exit_status is None:
-            # double check whether or not autoserv is running
-            if _drone_manager.is_process_running(self._state.process):
-                return
-
-            # pid but no running process - maybe process *just* exited
-            self._read_pidfile(use_second_read=True)
-            if self._state.exit_status is None:
-                # autoserv exited without writing an exit code
-                # to the pidfile
-                self._handle_pidfile_error(
-                    'autoserv died without writing exit code')
-
-
-    def _get_pidfile_info(self):
-        """\
-        After completion, self._state will contain:
-         pid=None, exit_status=None if autoserv has not yet run
-         pid!=None, exit_status=None if autoserv is running
-         pid!=None, exit_status!=None if autoserv has completed
-        """
-        try:
-            self._get_pidfile_info_helper()
-        except self._PidfileException, exc:
-            self._handle_pidfile_error('Pidfile error', traceback.format_exc())
-
-
-    def _handle_no_process(self):
-        """\
-        Called when no pidfile is found or no pid is in the pidfile.
-        """
-        message = 'No pid found at %s' % self.pidfile_id
-        if time.time() - self._start_time > _get_pidfile_timeout_secs():
-            email_manager.manager.enqueue_notify_email(
-                'Process has failed to write pidfile', message)
-            self.on_lost_process()
-
-
-    def on_lost_process(self, process=None):
-        """\
-        Called when autoserv has exited without writing an exit status,
-        or we've timed out waiting for autoserv to write a pid to the
-        pidfile.  In either case, we just return failure and the caller
-        should signal some kind of warning.
-
-        process is unimportant here, as it shouldn't be used by anyone.
-        """
-        self.lost_process = True
-        self._state.process = process
-        self._state.exit_status = 1
-        self._state.num_tests_failed = 0
-
-
-    def exit_code(self):
-        self._get_pidfile_info()
-        return self._state.exit_status
-
-
-    def num_tests_failed(self):
-        """@returns The number of tests that failed or -1 if unknown."""
-        self._get_pidfile_info()
-        if self._state.num_tests_failed is None:
-            return -1
-        return self._state.num_tests_failed
-
-
-    def try_copy_results_on_drone(self, **kwargs):
-        if self.has_process():
-            # copy results logs into the normal place for job results
-            _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
-
-
-    def try_copy_to_results_repository(self, source, **kwargs):
-        if self.has_process():
-            _drone_manager.copy_to_results_repository(self.get_process(),
-                                                      source, **kwargs)
-
-
 class Agent(object):
     """
     An agent for use by the Dispatcher class to perform a task.  An agent wraps
@@ -1326,837 +1146,7 @@
                 self.finished = True
 
 
-class BaseAgentTask(object):
-    class _NullMonitor(object):
-        pidfile_id = None
-
-        def has_process(self):
-            return True
-
-
-    def __init__(self, log_file_name=None):
-        """
-        @param log_file_name: (optional) name of file to log command output to
-        """
-        self.done = False
-        self.started = False
-        self.success = None
-        self.aborted = False
-        self.monitor = None
-        self.queue_entry_ids = []
-        self.host_ids = []
-        self._log_file_name = log_file_name
-
-
-    def _set_ids(self, host=None, queue_entries=None):
-        if queue_entries and queue_entries != [None]:
-            self.host_ids = [entry.host.id for entry in queue_entries]
-            self.queue_entry_ids = [entry.id for entry in queue_entries]
-        else:
-            assert host
-            self.host_ids = [host.id]
-
-
-    def poll(self):
-        if not self.started:
-            self.start()
-        if not self.done:
-            self.tick()
-
-
-    def tick(self):
-        assert self.monitor
-        exit_code = self.monitor.exit_code()
-        if exit_code is None:
-            return
-
-        success = (exit_code == 0)
-        self.finished(success)
-
-
-    def is_done(self):
-        return self.done
-
-
-    def finished(self, success):
-        if self.done:
-            assert self.started
-            return
-        self.started = True
-        self.done = True
-        self.success = success
-        self.epilog()
-
-
-    def prolog(self):
-        """
-        To be overridden.
-        """
-        assert not self.monitor
-        self.register_necessary_pidfiles()
-
-
-    def _log_file(self):
-        if not self._log_file_name:
-            return None
-        return os.path.join(self._working_directory(), self._log_file_name)
-
-
-    def cleanup(self):
-        log_file = self._log_file()
-        if self.monitor and log_file:
-            self.monitor.try_copy_to_results_repository(log_file)
-
-
-    def epilog(self):
-        """
-        To be overridden.
-        """
-        self.cleanup()
-        logging.info("%s finished with success=%s", type(self).__name__,
-                     self.success)
-
-
-    def start(self):
-        if not self.started:
-            self.prolog()
-            self.run()
-
-        self.started = True
-
-
-    def abort(self):
-        if self.monitor:
-            self.monitor.kill()
-        self.done = True
-        self.aborted = True
-        self.cleanup()
-
-
-    def _get_consistent_execution_path(self, execution_entries):
-        first_execution_path = execution_entries[0].execution_path()
-        for execution_entry in execution_entries[1:]:
-            assert execution_entry.execution_path() == first_execution_path, (
-                '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
-                                        execution_entry,
-                                        first_execution_path,
-                                        execution_entries[0]))
-        return first_execution_path
-
-
-    def _copy_results(self, execution_entries, use_monitor=None):
-        """
-        @param execution_entries: list of objects with execution_path() method
-        """
-        if use_monitor is not None and not use_monitor.has_process():
-            return
-
-        assert len(execution_entries) > 0
-        if use_monitor is None:
-            assert self.monitor
-            use_monitor = self.monitor
-        assert use_monitor.has_process()
-        execution_path = self._get_consistent_execution_path(execution_entries)
-        results_path = execution_path + '/'
-        use_monitor.try_copy_to_results_repository(results_path)
-
-
-    def _parse_results(self, queue_entries):
-        for queue_entry in queue_entries:
-            queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
-
-
-    def _archive_results(self, queue_entries):
-        for queue_entry in queue_entries:
-            queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
-
-
-    def _command_line(self):
-        """
-        Return the command line to run.  Must be overridden.
-        """
-        raise NotImplementedError
-
-
-    @property
-    def num_processes(self):
-        """
-        Return the number of processes forked by this BaseAgentTask's process.
-        It may only be approximate.  To be overridden if necessary.
-        """
-        return 1
-
-
-    def _paired_with_monitor(self):
-        """
-        If this BaseAgentTask's process must run on the same machine as some
-        previous process, this method should be overridden to return a
-        PidfileRunMonitor for that process.
-        """
-        return self._NullMonitor()
-
-
-    @property
-    def owner_username(self):
-        """
-        Return login of user responsible for this task.  May be None.  Must be
-        overridden.
-        """
-        raise NotImplementedError
-
-
-    def _working_directory(self):
-        """
-        Return the directory where this BaseAgentTask's process executes.
-        Must be overridden.
-        """
-        raise NotImplementedError
-
-
-    def _pidfile_name(self):
-        """
-        Return the name of the pidfile this BaseAgentTask's process uses.  To be
-        overridden if necessary.
-        """
-        return drone_manager.AUTOSERV_PID_FILE
-
-
-    def _check_paired_results_exist(self):
-        if not self._paired_with_monitor().has_process():
-            email_manager.manager.enqueue_notify_email(
-                    'No paired results in task',
-                    'No paired results in task %s at %s'
-                    % (self, self._paired_with_monitor().pidfile_id))
-            self.finished(False)
-            return False
-        return True
-
-
-    def _create_monitor(self):
-        assert not self.monitor
-        self.monitor = PidfileRunMonitor()
-
-
-    def run(self):
-        if not self._check_paired_results_exist():
-            return
-
-        self._create_monitor()
-        self.monitor.run(
-                self._command_line(), self._working_directory(),
-                num_processes=self.num_processes,
-                nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
-                pidfile_name=self._pidfile_name(),
-                paired_with_pidfile=self._paired_with_monitor().pidfile_id,
-                username=self.owner_username,
-                drone_hostnames_allowed=self.get_drone_hostnames_allowed())
-
-
-    def get_drone_hostnames_allowed(self):
-        if not models.DroneSet.drone_sets_enabled():
-            return None
-
-        hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
-        if not hqes:
-            # Only special tasks could be missing host queue entries
-            assert isinstance(self, SpecialAgentTask)
-            return self._user_or_global_default_drone_set(
-                    self.task, self.task.requested_by)
-
-        job_ids = hqes.values_list('job', flat=True).distinct()
-        assert job_ids.count() == 1, ("BaseAgentTask's queue entries "
-                                      "span multiple jobs")
-
-        job = models.Job.objects.get(id=job_ids[0])
-        drone_set = job.drone_set
-        if not drone_set:
-            return self._user_or_global_default_drone_set(job, job.user())
-
-        return drone_set.get_drone_hostnames()
-
-
-    def _user_or_global_default_drone_set(self, obj_with_owner, user):
-        """
-        Returns the user's default drone set, if present.
-
-        Otherwise, returns the global default drone set.
-        """
-        default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
-        if not user:
-            logging.warn('%s had no owner; using default drone set',
-                         obj_with_owner)
-            return default_hostnames
-        if not user.drone_set:
-            logging.warn('User %s has no default drone set, using global '
-                         'default', user.login)
-            return default_hostnames
-        return user.drone_set.get_drone_hostnames()
-
-
-    def register_necessary_pidfiles(self):
-        pidfile_id = _drone_manager.get_pidfile_id_from(
-                self._working_directory(), self._pidfile_name())
-        _drone_manager.register_pidfile(pidfile_id)
-
-        paired_pidfile_id = self._paired_with_monitor().pidfile_id
-        if paired_pidfile_id:
-            _drone_manager.register_pidfile(paired_pidfile_id)
-
-
-    def recover(self):
-        if not self._check_paired_results_exist():
-            return
-
-        self._create_monitor()
-        self.monitor.attach_to_existing_process(
-                self._working_directory(), pidfile_name=self._pidfile_name(),
-                num_processes=self.num_processes)
-        if not self.monitor.has_process():
-            # no process to recover; wait to be started normally
-            self.monitor = None
-            return
-
-        self.started = True
-        logging.info('Recovering process %s for %s at %s',
-                     self.monitor.get_process(), type(self).__name__,
-                     self._working_directory())
-
-
-    def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
-                                    allowed_host_statuses=None):
-        class_name = self.__class__.__name__
-        for entry in queue_entries:
-            if entry.status not in allowed_hqe_statuses:
-                raise host_scheduler.SchedulerError(
-                        '%s attempting to start entry with invalid status %s: '
-                        '%s' % (class_name, entry.status, entry))
-            invalid_host_status = (
-                    allowed_host_statuses is not None
-                    and entry.host.status not in allowed_host_statuses)
-            if invalid_host_status:
-                raise host_scheduler.SchedulerError(
-                        '%s attempting to start on queue entry with invalid '
-                        'host status %s: %s'
-                        % (class_name, entry.host.status, entry))
-
-
-SiteAgentTask = utils.import_site_class(
-    __file__, 'autotest_lib.scheduler.site_monitor_db',
-    'SiteAgentTask', BaseAgentTask)
-
-class AgentTask(SiteAgentTask):
-    pass
-
-
-class TaskWithJobKeyvals(object):
-    """AgentTask mixin providing functionality to help with job keyval files."""
-    _KEYVAL_FILE = 'keyval'
-    def _format_keyval(self, key, value):
-        return '%s=%s' % (key, value)
-
-
-    def _keyval_path(self):
-        """Subclasses must override this"""
-        raise NotImplementedError
-
-
-    def _write_keyval_after_job(self, field, value):
-        assert self.monitor
-        if not self.monitor.has_process():
-            return
-        _drone_manager.write_lines_to_file(
-            self._keyval_path(), [self._format_keyval(field, value)],
-            paired_with_process=self.monitor.get_process())
-
-
-    def _job_queued_keyval(self, job):
-        return 'job_queued', int(time.mktime(job.created_on.timetuple()))
-
-
-    def _write_job_finished(self):
-        self._write_keyval_after_job("job_finished", int(time.time()))
-
-
-    def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
-        keyval_contents = '\n'.join(self._format_keyval(key, value)
-                                    for key, value in keyval_dict.iteritems())
-        # always end with a newline to allow additional keyvals to be written
-        keyval_contents += '\n'
-        _drone_manager.attach_file_to_execution(self._working_directory(),
-                                                keyval_contents,
-                                                file_path=keyval_path)
-
-
-    def _write_keyvals_before_job(self, keyval_dict):
-        self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
-
-
-    def _write_host_keyvals(self, host):
-        keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
-                                   host.hostname)
-        platform, all_labels = host.platform_and_labels()
-        all_labels = [ urllib.quote(label) for label in all_labels ]
-        keyval_dict = dict(platform=platform, labels=','.join(all_labels))
-        self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
-
-
-class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
-    """
-    Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
-    """
-
-    TASK_TYPE = None
-    host = None
-    queue_entry = None
-
-    def __init__(self, task, extra_command_args):
-        super(SpecialAgentTask, self).__init__()
-
-        assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
-
-        self.host = scheduler_models.Host(id=task.host.id)
-        self.queue_entry = None
-        if task.queue_entry:
-            self.queue_entry = scheduler_models.HostQueueEntry(
-                    id=task.queue_entry.id)
-
-        self.task = task
-        self._extra_command_args = extra_command_args
-
-
-    def _keyval_path(self):
-        return os.path.join(self._working_directory(), self._KEYVAL_FILE)
-
-
-    def _command_line(self):
-        return _autoserv_command_line(self.host.hostname,
-                                      self._extra_command_args,
-                                      queue_entry=self.queue_entry)
-
-
-    def _working_directory(self):
-        return self.task.execution_path()
-
-
-    @property
-    def owner_username(self):
-        if self.task.requested_by:
-            return self.task.requested_by.login
-        return None
-
-
-    def prolog(self):
-        super(SpecialAgentTask, self).prolog()
-        self.task.activate()
-        self._write_host_keyvals(self.host)
-
-
-    def _fail_queue_entry(self):
-        assert self.queue_entry
-
-        if self.queue_entry.meta_host:
-            return # don't fail metahost entries, they'll be reassigned
-
-        self.queue_entry.update_from_database()
-        if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
-            return # entry has been aborted
-
-        self._actually_fail_queue_entry()
-
-
-    # TODO(milleral): http://crbug.com/268607
-    # All this used to be a part of _fail_queue_entry.  The
-    # exact semantics of when one should and should not be failing a queue
-    # entry need to be worked out, because provisioning has placed us in a
-    # case where we want to fail a queue entry that could be requeued,
-    # which makes us fail the two above if statements, and thus
-    # _fail_queue_entry() would exit early and have no effect.
-    # What's left here with _actually_fail_queue_entry is a hack to be able to
-    # bypass the checks and unconditionally execute the code.
-    def _actually_fail_queue_entry(self):
-        self.queue_entry.set_execution_subdir()
-        queued_key, queued_time = self._job_queued_keyval(
-            self.queue_entry.job)
-        self._write_keyval_after_job(queued_key, queued_time)
-        self._write_job_finished()
-
-        # copy results logs into the normal place for job results
-        self.monitor.try_copy_results_on_drone(
-                source_path=self._working_directory() + '/',
-                destination_path=self.queue_entry.execution_path() + '/')
-
-        pidfile_id = _drone_manager.get_pidfile_id_from(
-                self.queue_entry.execution_path(),
-                pidfile_name=drone_manager.AUTOSERV_PID_FILE)
-        _drone_manager.register_pidfile(pidfile_id)
-
-        if self.queue_entry.job.parse_failed_repair:
-            self._parse_results([self.queue_entry])
-        else:
-            self._archive_results([self.queue_entry])
-
-        # Also fail all other special tasks that have not yet run for this HQE
-        pending_tasks = models.SpecialTask.objects.filter(
-                queue_entry__id=self.queue_entry.id,
-                is_complete=0)
-        for task in pending_tasks:
-            task.finish(False)
-
-
-    def cleanup(self):
-        super(SpecialAgentTask, self).cleanup()
-
-        # We will consider an aborted task to be "Failed"
-        self.task.finish(bool(self.success))
-
-        if self.monitor:
-            if self.monitor.has_process():
-                self._copy_results([self.task])
-            if self.monitor.pidfile_id is not None:
-                _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
-
-
-    def remove_special_tasks(self, special_task_to_remove, keep_last_one=False):
-        """Remove a type of special task in all tasks, keep last one if needed.
-
-        @param special_task_to_remove: type of special task to be removed, e.g.,
-            models.SpecialTask.Task.VERIFY.
-        @param keep_last_one: True to keep the last special task if its type is
-            the same as of special_task_to_remove.
-
-        """
-        queued_special_tasks = models.SpecialTask.objects.filter(
-            host__id=self.host.id,
-            task=special_task_to_remove,
-            is_active=False, is_complete=False, queue_entry=None)
-        if keep_last_one:
-            queued_special_tasks = queued_special_tasks.exclude(id=self.task.id)
-        queued_special_tasks.delete()
-
-
-class RepairTask(SpecialAgentTask):
-    TASK_TYPE = models.SpecialTask.Task.REPAIR
-
-
-    def __init__(self, task):
-        """\
-        queue_entry: queue entry to mark failed if this repair fails.
-        """
-        protection = host_protections.Protection.get_string(
-                task.host.protection)
-        # normalize the protection name
-        protection = host_protections.Protection.get_attr_name(protection)
-
-        super(RepairTask, self).__init__(
-                task, ['-R', '--host-protection', protection])
-
-        # *don't* include the queue entry in IDs -- if the queue entry is
-        # aborted, we want to leave the repair task running
-        self._set_ids(host=self.host)
-
-
-    def prolog(self):
-        super(RepairTask, self).prolog()
-        logging.info("repair_task starting")
-        self.host.set_status(models.Host.Status.REPAIRING)
-
-
-    def epilog(self):
-        super(RepairTask, self).epilog()
-
-        if self.success:
-            self.host.set_status(models.Host.Status.READY)
-        else:
-            self.host.set_status(models.Host.Status.REPAIR_FAILED)
-            if self.queue_entry:
-                self._fail_queue_entry()
-
-
-class PreJobTask(SpecialAgentTask):
-    def _copy_to_results_repository(self):
-        if not self.queue_entry or self.queue_entry.meta_host:
-            return
-
-        self.queue_entry.set_execution_subdir()
-        log_name = os.path.basename(self.task.execution_path())
-        source = os.path.join(self.task.execution_path(), 'debug',
-                              'autoserv.DEBUG')
-        destination = os.path.join(
-                self.queue_entry.execution_path(), log_name)
-
-        self.monitor.try_copy_to_results_repository(
-                source, destination_path=destination)
-
-
-    def epilog(self):
-        super(PreJobTask, self).epilog()
-
-        if self.success:
-            return
-
-        if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
-            # effectively ignore failure for these hosts
-            self.success = True
-            return
-
-        if self.queue_entry:
-            # If we requeue a HQE, we should cancel any remaining pre-job
-            # tasks against this host, otherwise we'll be left in a state
-            # where a queued HQE has special tasks to run against a host.
-            models.SpecialTask.objects.filter(
-                    queue_entry__id=self.queue_entry.id,
-                    host__id=self.host.id,
-                    is_complete=0).update(is_complete=1, success=0)
-
-            previous_provisions = models.SpecialTask.objects.filter(
-                    task=models.SpecialTask.Task.PROVISION,
-                    queue_entry_id=self.queue_entry.id).count()
-            if (previous_provisions >
-                scheduler_config.config.max_provision_retries):
-                self._actually_fail_queue_entry()
-                # This abort will mark the aborted bit on the HQE itself, to
-                # signify that we're killing it.  Technically it also will do
-                # the recursive aborting of all child jobs, but that shouldn't
-                # matter here, as only suites have children, and those are
-                # hostless and thus don't have provisioning.
-                # TODO(milleral) http://crbug.com/188217
-                # However, we can't actually do this yet, as if we set the
-                # abort bit the FinalReparseTask will set the status of the HQE
-                # to ABORTED, which then means that we don't show the status in
-                # run_suite.  So in the meantime, don't mark the HQE as
-                # aborted.
-                # queue_entry.abort()
-            else:
-                # requeue() must come after handling provision retries, since
-                # _actually_fail_queue_entry needs an execution subdir.
-                # We also don't want to requeue if we hit the provision retry
-                # limit, since then we overwrite the PARSING state of the HQE.
-                self.queue_entry.requeue()
-
-            previous_repairs = models.SpecialTask.objects.filter(
-                    task=models.SpecialTask.Task.REPAIR,
-                    queue_entry_id=self.queue_entry.id).count()
-            if previous_repairs >= scheduler_config.config.max_repair_limit:
-                self.host.set_status(models.Host.Status.REPAIR_FAILED)
-                self._fail_queue_entry()
-                return
-
-            queue_entry = models.HostQueueEntry.objects.get(
-                    id=self.queue_entry.id)
-        else:
-            queue_entry = None
-
-        models.SpecialTask.objects.create(
-                host=models.Host.objects.get(id=self.host.id),
-                task=models.SpecialTask.Task.REPAIR,
-                queue_entry=queue_entry,
-                requested_by=self.task.requested_by)
-
-
-    def _should_pending(self):
-        """
-        Decide if we should call the host queue entry's on_pending method.
-        We should if:
-        1) There exists an associated host queue entry.
-        2) The current special task completed successfully.
-        3) There do not exist any more special tasks to be run before the
-           host queue entry starts.
-
-        @returns: True if we should call pending, false if not.
-
-        """
-        if not self.queue_entry or not self.success:
-            return False
-
-        # We know if this is the last one when we create it, so we could add
-        # another column to the database to keep track of this information, but
-        # I expect the overhead of querying here to be minimal.
-        queue_entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
-        queued = models.SpecialTask.objects.filter(
-                host__id=self.host.id, is_active=False,
-                is_complete=False, queue_entry=queue_entry)
-        queued = queued.exclude(id=self.task.id)
-        return queued.count() == 0
-
-
-class VerifyTask(PreJobTask):
-    TASK_TYPE = models.SpecialTask.Task.VERIFY
-
-
-    def __init__(self, task):
-        super(VerifyTask, self).__init__(task, ['-v'])
-        self._set_ids(host=self.host, queue_entries=[self.queue_entry])
-
-
-    def prolog(self):
-        super(VerifyTask, self).prolog()
-
-        logging.info("starting verify on %s", self.host.hostname)
-        if self.queue_entry:
-            self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
-        self.host.set_status(models.Host.Status.VERIFYING)
-
-        # Delete any queued manual reverifies for this host.  One verify will do
-        # and there's no need to keep records of other requests.
-        self.remove_special_tasks(models.SpecialTask.Task.VERIFY,
-                                  keep_last_one=True)
-
-
-    def epilog(self):
-        super(VerifyTask, self).epilog()
-        if self.success:
-            if self._should_pending():
-                self.queue_entry.on_pending()
-            else:
-                self.host.set_status(models.Host.Status.READY)
-
-
-class CleanupTask(PreJobTask):
-    # note this can also run post-job, but when it does, it's running standalone
-    # against the host (not related to the job), so it's not considered a
-    # PostJobTask
-
-    TASK_TYPE = models.SpecialTask.Task.CLEANUP
-
-
-    def __init__(self, task, recover_run_monitor=None):
-        super(CleanupTask, self).__init__(task, ['--cleanup'])
-        self._set_ids(host=self.host, queue_entries=[self.queue_entry])
-
-
-    def prolog(self):
-        super(CleanupTask, self).prolog()
-        logging.info("starting cleanup task for host: %s", self.host.hostname)
-        self.host.set_status(models.Host.Status.CLEANING)
-        if self.queue_entry:
-            self.queue_entry.set_status(models.HostQueueEntry.Status.CLEANING)
-
-
-    def _finish_epilog(self):
-        if not self.queue_entry or not self.success:
-            return
-
-        do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
-        should_run_verify = (
-                self.queue_entry.job.run_verify
-                and self.host.protection != do_not_verify_protection)
-        if should_run_verify:
-            entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
-            models.SpecialTask.objects.create(
-                    host=models.Host.objects.get(id=self.host.id),
-                    queue_entry=entry,
-                    task=models.SpecialTask.Task.VERIFY)
-        else:
-            if self._should_pending():
-                self.queue_entry.on_pending()
-
-
-    def epilog(self):
-        super(CleanupTask, self).epilog()
-
-        if self.success:
-            self.host.update_field('dirty', 0)
-            self.host.set_status(models.Host.Status.READY)
-
-        self._finish_epilog()
-
-
-class ResetTask(PreJobTask):
-    """Task to reset a DUT, including cleanup and verify."""
-    # note this can also run post-job, but when it does, it's running standalone
-    # against the host (not related to the job), so it's not considered a
-    # PostJobTask
-
-    TASK_TYPE = models.SpecialTask.Task.RESET
-
-
-    def __init__(self, task, recover_run_monitor=None):
-        super(ResetTask, self).__init__(task, ['--reset'])
-        self._set_ids(host=self.host, queue_entries=[self.queue_entry])
-
-
-    def prolog(self):
-        super(ResetTask, self).prolog()
-        logging.info('starting reset task for host: %s',
-                     self.host.hostname)
-        self.host.set_status(models.Host.Status.RESETTING)
-        if self.queue_entry:
-            self.queue_entry.set_status(models.HostQueueEntry.Status.RESETTING)
-
-        # Delete any queued cleanups for this host.
-        self.remove_special_tasks(models.SpecialTask.Task.CLEANUP,
-                                  keep_last_one=False)
-
-        # Delete any queued reverifies for this host.
-        self.remove_special_tasks(models.SpecialTask.Task.VERIFY,
-                                  keep_last_one=False)
-
-        # Only one reset is needed.
-        self.remove_special_tasks(models.SpecialTask.Task.RESET,
-                                  keep_last_one=True)
-
-
-    def epilog(self):
-        super(ResetTask, self).epilog()
-
-        if self.success:
-            self.host.update_field('dirty', 0)
-
-            if self._should_pending():
-                self.queue_entry.on_pending()
-            else:
-                self.host.set_status(models.Host.Status.READY)
-
-
-class ProvisionTask(PreJobTask):
-    TASK_TYPE = models.SpecialTask.Task.PROVISION
-
-    def __init__(self, task):
-        # Provisioning requires that we be associated with a job/queue entry
-        assert task.queue_entry, "No HQE associated with provision task!"
-        # task.queue_entry is an afe model HostQueueEntry object.
-        # self.queue_entry is a scheduler models HostQueueEntry object, but
-        # it gets constructed and assigned in __init__, so it's not available
-        # yet.  Therefore, we're stuck pulling labels off of the afe model
-        # so that we can pass the --provision args into the __init__ call.
-        labels = {x.name for x in task.queue_entry.job.dependency_labels.all()}
-        _, provisionable = provision.filter_labels(labels)
-        extra_command_args = ['--provision', ','.join(provisionable)]
-        super(ProvisionTask, self).__init__(task, extra_command_args)
-        self._set_ids(host=self.host, queue_entries=[self.queue_entry])
-
-
-    def _command_line(self):
-        # If we give queue_entry to _autoserv_command_line, then it will append
-        # -c for this invocation if the queue_entry is a client side test. We
-        # don't want that, as it messes with provisioning, so we just drop it
-        # from the arguments here.
-        # Note that we also don't verify job_repo_url as provisioining tasks are
-        # required to stage whatever content we need, and the job itself will
-        # force autotest to be staged if it isn't already.
-        return _autoserv_command_line(self.host.hostname,
-                                      self._extra_command_args)
-
-
-    def prolog(self):
-        super(ProvisionTask, self).prolog()
-        # add check for previous provision task and abort if exist.
-        logging.info("starting provision task for host: %s", self.host.hostname)
-        self.queue_entry.set_status(
-                models.HostQueueEntry.Status.PROVISIONING)
-        self.host.set_status(models.Host.Status.PROVISIONING)
-
-
-    def epilog(self):
-        super(ProvisionTask, self).epilog()
-
-        if self._should_pending():
-            self.queue_entry.on_pending()
-        else:
-            self.host.set_status(models.Host.Status.READY)
-
-
-class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
+class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
     """
     Common functionality for QueueTask and HostlessQueueTask
     """
@@ -2341,368 +1331,5 @@
         self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
 
 
-class PostJobTask(AgentTask):
-    def __init__(self, queue_entries, log_file_name):
-        super(PostJobTask, self).__init__(log_file_name=log_file_name)
-
-        self.queue_entries = queue_entries
-
-        self._autoserv_monitor = PidfileRunMonitor()
-        self._autoserv_monitor.attach_to_existing_process(
-                self._working_directory())
-
-
-    def _command_line(self):
-        if _testing_mode:
-            return 'true'
-        return self._generate_command(
-                _drone_manager.absolute_path(self._working_directory()))
-
-
-    def _generate_command(self, results_dir):
-        raise NotImplementedError('Subclasses must override this')
-
-
-    @property
-    def owner_username(self):
-        return self.queue_entries[0].job.owner
-
-
-    def _working_directory(self):
-        return self._get_consistent_execution_path(self.queue_entries)
-
-
-    def _paired_with_monitor(self):
-        return self._autoserv_monitor
-
-
-    def _job_was_aborted(self):
-        was_aborted = None
-        for queue_entry in self.queue_entries:
-            queue_entry.update_from_database()
-            if was_aborted is None: # first queue entry
-                was_aborted = bool(queue_entry.aborted)
-            elif was_aborted != bool(queue_entry.aborted): # subsequent entries
-                entries = ['%s (aborted: %s)' % (entry, entry.aborted)
-                           for entry in self.queue_entries]
-                email_manager.manager.enqueue_notify_email(
-                        'Inconsistent abort state',
-                        'Queue entries have inconsistent abort state:\n' +
-                        '\n'.join(entries))
-                # don't crash here, just assume true
-                return True
-        return was_aborted
-
-
-    def _final_status(self):
-        if self._job_was_aborted():
-            return models.HostQueueEntry.Status.ABORTED
-
-        # we'll use a PidfileRunMonitor to read the autoserv exit status
-        if self._autoserv_monitor.exit_code() == 0:
-            return models.HostQueueEntry.Status.COMPLETED
-        return models.HostQueueEntry.Status.FAILED
-
-
-    def _set_all_statuses(self, status):
-        for queue_entry in self.queue_entries:
-            queue_entry.set_status(status)
-
-
-    def abort(self):
-        # override AgentTask.abort() to avoid killing the process and ending
-        # the task.  post-job tasks continue when the job is aborted.
-        pass
-
-
-    def _pidfile_label(self):
-        # '.autoserv_execute' -> 'autoserv'
-        return self._pidfile_name()[1:-len('_execute')]
-
-
-class GatherLogsTask(PostJobTask):
-    """
-    Task responsible for
-    * gathering uncollected logs (if Autoserv crashed hard or was killed)
-    * copying logs to the results repository
-    * spawning CleanupTasks for hosts, if necessary
-    * spawning a FinalReparseTask for the job
-    """
-    def __init__(self, queue_entries, recover_run_monitor=None):
-        self._job = queue_entries[0].job
-        super(GatherLogsTask, self).__init__(
-            queue_entries, log_file_name='.collect_crashinfo.log')
-        self._set_ids(queue_entries=queue_entries)
-
-
-    # TODO: Refactor into autoserv_utils. crbug.com/243090
-    def _generate_command(self, results_dir):
-        host_list = ','.join(queue_entry.host.hostname
-                             for queue_entry in self.queue_entries)
-        return [_autoserv_path , '-p',
-                '--pidfile-label=%s' % self._pidfile_label(),
-                '--use-existing-results', '--collect-crashinfo',
-                '-m', host_list, '-r', results_dir]
-
-
-    @property
-    def num_processes(self):
-        return len(self.queue_entries)
-
-
-    def _pidfile_name(self):
-        return drone_manager.CRASHINFO_PID_FILE
-
-
-    def prolog(self):
-        self._check_queue_entry_statuses(
-                self.queue_entries,
-                allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
-                allowed_host_statuses=(models.Host.Status.RUNNING,))
-
-        super(GatherLogsTask, self).prolog()
-
-
-    def epilog(self):
-        super(GatherLogsTask, self).epilog()
-        self._parse_results(self.queue_entries)
-        self._reboot_hosts()
-
-
-    def _reboot_hosts(self):
-        if self._autoserv_monitor.has_process():
-            final_success = (self._final_status() ==
-                             models.HostQueueEntry.Status.COMPLETED)
-            num_tests_failed = self._autoserv_monitor.num_tests_failed()
-        else:
-            final_success = False
-            num_tests_failed = 0
-        reboot_after = self._job.reboot_after
-        do_reboot = (
-                # always reboot after aborted jobs
-                self._final_status() == models.HostQueueEntry.Status.ABORTED
-                or reboot_after == model_attributes.RebootAfter.ALWAYS
-                or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
-                    and final_success and num_tests_failed == 0)
-                or num_tests_failed > 0)
-
-        for queue_entry in self.queue_entries:
-            if do_reboot:
-                # don't pass the queue entry to the CleanupTask. if the cleanup
-                # fails, the job doesn't care -- it's over.
-                models.SpecialTask.objects.create(
-                        host=models.Host.objects.get(id=queue_entry.host.id),
-                        task=models.SpecialTask.Task.CLEANUP,
-                        requested_by=self._job.owner_model())
-            else:
-                queue_entry.host.set_status(models.Host.Status.READY)
-
-
-    def run(self):
-        autoserv_exit_code = self._autoserv_monitor.exit_code()
-        # only run if Autoserv exited due to some signal. if we have no exit
-        # code, assume something bad (and signal-like) happened.
-        if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
-            super(GatherLogsTask, self).run()
-        else:
-            self.finished(True)
-
-
-class SelfThrottledPostJobTask(PostJobTask):
-    """
-    Special AgentTask subclass that maintains its own global process limit.
-    """
-    _num_running_processes = 0
-    # Last known limit of max processes, used to check whether
-    # max processes config has been changed.
-    _last_known_max_processes = 0
-    # Whether an email should be sent to notifiy process limit being hit.
-    _notification_on = True
-    # Once process limit is hit, an email will be sent.
-    # To prevent spams, do not send another email until
-    # it drops to lower than the following level.
-    REVIVE_NOTIFICATION_THRESHOLD = 0.80
-
-
-    @classmethod
-    def _increment_running_processes(cls):
-        cls._num_running_processes += 1
-        stats.Gauge('scheduler').send('%s.num_running_processes' % cls.__name__,
-                                      cls._num_running_processes)
-
-
-    @classmethod
-    def _decrement_running_processes(cls):
-        cls._num_running_processes -= 1
-        stats.Gauge('scheduler').send('%s.num_running_processes' % cls.__name__,
-                                      cls._num_running_processes)
-
-
-    @classmethod
-    def _max_processes(cls):
-        raise NotImplementedError
-
-
-    @classmethod
-    def _can_run_new_process(cls):
-        return cls._num_running_processes < cls._max_processes()
-
-
-    def _process_started(self):
-        return bool(self.monitor)
-
-
-    def tick(self):
-        # override tick to keep trying to start until the process count goes
-        # down and we can, at which point we revert to default behavior
-        if self._process_started():
-            super(SelfThrottledPostJobTask, self).tick()
-        else:
-            self._try_starting_process()
-
-
-    def run(self):
-        # override run() to not actually run unless we can
-        self._try_starting_process()
-
-
-    @classmethod
-    def _notify_process_limit_hit(cls):
-        """Send an email to notify that process limit is hit."""
-        if cls._notification_on:
-            subject = '%s: hitting max process limit.' % cls.__name__
-            message = ('Running processes/Max processes: %d/%d'
-                       % (cls._num_running_processes, cls._max_processes()))
-            email_manager.manager.enqueue_notify_email(subject, message)
-            cls._notification_on = False
-
-
-    @classmethod
-    def _reset_notification_switch_if_necessary(cls):
-        """Reset _notification_on if necessary.
-
-        Set _notification_on to True on the following cases:
-        1) If the limit of max processes configuration changes;
-        2) If _notification_on is False and the number of running processes
-           drops to lower than a level defined in REVIVE_NOTIFICATION_THRESHOLD.
-
-        """
-        if cls._last_known_max_processes != cls._max_processes():
-            cls._notification_on = True
-            cls._last_known_max_processes = cls._max_processes()
-            return
-        percentage = float(cls._num_running_processes) / cls._max_processes()
-        if (not cls._notification_on and
-            percentage < cls.REVIVE_NOTIFICATION_THRESHOLD):
-            cls._notification_on = True
-
-
-    def _try_starting_process(self):
-        self._reset_notification_switch_if_necessary()
-        if not self._can_run_new_process():
-            self._notify_process_limit_hit()
-            return
-
-        # actually run the command
-        super(SelfThrottledPostJobTask, self).run()
-        if self._process_started():
-            self._increment_running_processes()
-
-
-    def finished(self, success):
-        super(SelfThrottledPostJobTask, self).finished(success)
-        if self._process_started():
-            self._decrement_running_processes()
-
-
-class FinalReparseTask(SelfThrottledPostJobTask):
-    def __init__(self, queue_entries):
-        super(FinalReparseTask, self).__init__(queue_entries,
-                                               log_file_name='.parse.log')
-        # don't use _set_ids, since we don't want to set the host_ids
-        self.queue_entry_ids = [entry.id for entry in queue_entries]
-
-
-    def _generate_command(self, results_dir):
-        return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
-                results_dir]
-
-
-    @property
-    def num_processes(self):
-        return 0 # don't include parser processes in accounting
-
-
-    def _pidfile_name(self):
-        return drone_manager.PARSER_PID_FILE
-
-
-    @classmethod
-    def _max_processes(cls):
-        return scheduler_config.config.max_parse_processes
-
-
-    def prolog(self):
-        self._check_queue_entry_statuses(
-                self.queue_entries,
-                allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
-
-        super(FinalReparseTask, self).prolog()
-
-
-    def epilog(self):
-        super(FinalReparseTask, self).epilog()
-        self._archive_results(self.queue_entries)
-
-
-class ArchiveResultsTask(SelfThrottledPostJobTask):
-    _ARCHIVING_FAILED_FILE = '.archiver_failed'
-
-    def __init__(self, queue_entries):
-        super(ArchiveResultsTask, self).__init__(queue_entries,
-                                                 log_file_name='.archiving.log')
-        # don't use _set_ids, since we don't want to set the host_ids
-        self.queue_entry_ids = [entry.id for entry in queue_entries]
-
-
-    def _pidfile_name(self):
-        return drone_manager.ARCHIVER_PID_FILE
-
-
-    # TODO: Refactor into autoserv_utils. crbug.com/243090
-    def _generate_command(self, results_dir):
-        return [_autoserv_path , '-p',
-                '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
-                '--use-existing-results', '--control-filename=control.archive',
-                os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
-                             'archive_results.control.srv')]
-
-
-    @classmethod
-    def _max_processes(cls):
-        return scheduler_config.config.max_transfer_processes
-
-
-    def prolog(self):
-        self._check_queue_entry_statuses(
-                self.queue_entries,
-                allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
-
-        super(ArchiveResultsTask, self).prolog()
-
-
-    def epilog(self):
-        super(ArchiveResultsTask, self).epilog()
-        if not self.success and self._paired_with_monitor().has_process():
-            failed_file = os.path.join(self._working_directory(),
-                                       self._ARCHIVING_FAILED_FILE)
-            paired_process = self._paired_with_monitor().get_process()
-            _drone_manager.write_lines_to_file(
-                    failed_file, ['Archiving failed with exit code %s'
-                                  % self.monitor.exit_code()],
-                    paired_with_process=paired_process)
-        self._set_all_statuses(self._final_status())
-
-
 if __name__ == '__main__':
     main()
diff --git a/scheduler/monitor_db_unittest.py b/scheduler/monitor_db_unittest.py
index 016f264..b4cfcda 100755
--- a/scheduler/monitor_db_unittest.py
+++ b/scheduler/monitor_db_unittest.py
@@ -9,7 +9,9 @@
 from autotest_lib.client.common_lib.test_utils import unittest
 from autotest_lib.database import database_connection
 from autotest_lib.frontend.afe import models
+from autotest_lib.scheduler import agent_task
 from autotest_lib.scheduler import monitor_db, drone_manager, email_manager
+from autotest_lib.scheduler import pidfile_monitor
 from autotest_lib.scheduler import scheduler_config, gc_stats, host_scheduler
 from autotest_lib.scheduler import monitor_db_functional_test
 from autotest_lib.scheduler import scheduler_models
@@ -831,10 +833,10 @@
         self.god = mock.mock_god()
         self.mock_drone_manager = self.god.create_mock_class(
             drone_manager.DroneManager, 'drone_manager')
-        self.god.stub_with(monitor_db, '_drone_manager',
+        self.god.stub_with(pidfile_monitor, '_drone_manager',
                            self.mock_drone_manager)
         self.god.stub_function(email_manager.manager, 'enqueue_notify_email')
-        self.god.stub_with(monitor_db, '_get_pidfile_timeout_secs',
+        self.god.stub_with(pidfile_monitor, '_get_pidfile_timeout_secs',
                            self._mock_get_pidfile_timeout_secs)
 
         self.pidfile_id = object()
@@ -844,7 +846,7 @@
                           pidfile_name=drone_manager.AUTOSERV_PID_FILE)
              .and_return(self.pidfile_id))
 
-        self.monitor = monitor_db.PidfileRunMonitor()
+        self.monitor = pidfile_monitor.PidfileRunMonitor()
         self.monitor.attach_to_existing_process(self.execution_tag)
 
     def tearDown(self):
@@ -931,7 +933,7 @@
         self.mock_drone_manager.get_pidfile_contents.expect_call(
             self.pidfile_id, use_second_read=False).and_return(
             drone_manager.InvalidPidfile('error'))
-        self.assertRaises(monitor_db.PidfileRunMonitor._PidfileException,
+        self.assertRaises(pidfile_monitor.PidfileRunMonitor._PidfileException,
                           self.monitor._read_pidfile)
         self.god.check_playback()
 
@@ -995,7 +997,7 @@
         email_manager.manager.enqueue_notify_email.expect_call(
             mock.is_string_comparator(), mock.is_string_comparator())
         self.monitor._start_time = (time.time() -
-                                    monitor_db._get_pidfile_timeout_secs() - 1)
+                                    pidfile_monitor._get_pidfile_timeout_secs() - 1)
         self._test_get_pidfile_info_helper(None, 1, 0)
         self.assertTrue(self.monitor.lost_process)
 
@@ -1012,7 +1014,7 @@
 
 
     def _create_mock_task(self, name):
-        task = self.god.create_mock_class(monitor_db.AgentTask, name)
+        task = self.god.create_mock_class(agent_task.AgentTask, name)
         task.num_processes = 1
         _set_host_and_qe_ids(task)
         return task
@@ -1414,7 +1416,7 @@
         hqe_3 = job_3.hostqueueentry_set.all()[0]
         hqe_4 = job_4.hostqueueentry_set.all()[0]
 
-        return (hqe_1, hqe_2, hqe_3, hqe_4), monitor_db.AgentTask()
+        return (hqe_1, hqe_2, hqe_3, hqe_4), agent_task.AgentTask()
 
 
     def test_get_drone_hostnames_allowed_no_drones_in_set(self):
@@ -1458,7 +1460,7 @@
         class MockSpecialTask(object):
             requested_by = object()
 
-        class MockSpecialAgentTask(monitor_db.SpecialAgentTask):
+        class MockSpecialAgentTask(agent_task.SpecialAgentTask):
             task = MockSpecialTask()
             queue_entry_ids = []
             def __init__(self, *args, **kwargs):
@@ -1498,7 +1500,7 @@
 
         self._setup_test_user_or_global_default_drone_set()
 
-        actual = monitor_db.AgentTask()._user_or_global_default_drone_set(
+        actual = agent_task.AgentTask()._user_or_global_default_drone_set(
                 None, MockUser())
 
         self.assertEqual(expected, actual)
@@ -1507,7 +1509,7 @@
 
     def test_user_or_global_default_drone_set_no_user(self):
         expected = self._setup_test_user_or_global_default_drone_set()
-        actual = monitor_db.AgentTask()._user_or_global_default_drone_set(
+        actual = agent_task.AgentTask()._user_or_global_default_drone_set(
                 None, None)
 
         self.assertEqual(expected, actual)
@@ -1520,7 +1522,7 @@
             login = None
 
         expected = self._setup_test_user_or_global_default_drone_set()
-        actual = monitor_db.AgentTask()._user_or_global_default_drone_set(
+        actual = agent_task.AgentTask()._user_or_global_default_drone_set(
                 None, MockUser())
 
         self.assertEqual(expected, actual)
diff --git a/scheduler/pidfile_monitor.py b/scheduler/pidfile_monitor.py
new file mode 100644
index 0000000..72d4c79
--- /dev/null
+++ b/scheduler/pidfile_monitor.py
@@ -0,0 +1,196 @@
+#pylint: disable-msg=C0111
+
+"""
+Pidfile monitor.
+"""
+
+import time, traceback
+from autotest_lib.client.common_lib import global_config
+from autotest_lib.scheduler import drone_manager, email_manager
+from autotest_lib.scheduler import scheduler_config
+
+
+_drone_manager = drone_manager.instance()
+
+def _get_pidfile_timeout_secs():
+    """@returns How long to wait for autoserv to write pidfile."""
+    pidfile_timeout_mins = global_config.global_config.get_config_value(
+            scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
+    return pidfile_timeout_mins * 60
+
+
+class PidfileRunMonitor(object):
+    """
+    Client must call either run() to start a new process or
+    attach_to_existing_process().
+    """
+
+    class _PidfileException(Exception):
+        """
+        Raised when there's some unexpected behavior with the pid file, but only
+        used internally (never allowed to escape this class).
+        """
+
+
+    def __init__(self):
+        self.lost_process = False
+        self._start_time = None
+        self.pidfile_id = None
+        self._state = drone_manager.PidfileContents()
+
+
+    def _add_nice_command(self, command, nice_level):
+        if not nice_level:
+            return command
+        return ['nice', '-n', str(nice_level)] + command
+
+
+    def _set_start_time(self):
+        self._start_time = time.time()
+
+
+    def run(self, command, working_directory, num_processes, nice_level=None,
+            log_file=None, pidfile_name=None, paired_with_pidfile=None,
+            username=None, drone_hostnames_allowed=None):
+        assert command is not None
+        if nice_level is not None:
+            command = ['nice', '-n', str(nice_level)] + command
+        self._set_start_time()
+        self.pidfile_id = _drone_manager.execute_command(
+            command, working_directory, pidfile_name=pidfile_name,
+            num_processes=num_processes, log_file=log_file,
+            paired_with_pidfile=paired_with_pidfile, username=username,
+            drone_hostnames_allowed=drone_hostnames_allowed)
+
+
+    def attach_to_existing_process(self, execution_path,
+                                   pidfile_name=drone_manager.AUTOSERV_PID_FILE,
+                                   num_processes=None):
+        self._set_start_time()
+        self.pidfile_id = _drone_manager.get_pidfile_id_from(
+            execution_path, pidfile_name=pidfile_name)
+        if num_processes is not None:
+            _drone_manager.declare_process_count(self.pidfile_id, num_processes)
+
+
+    def kill(self):
+        if self.has_process():
+            _drone_manager.kill_process(self.get_process())
+
+
+    def has_process(self):
+        self._get_pidfile_info()
+        return self._state.process is not None
+
+
+    def get_process(self):
+        self._get_pidfile_info()
+        assert self._state.process is not None
+        return self._state.process
+
+
+    def _read_pidfile(self, use_second_read=False):
+        assert self.pidfile_id is not None, (
+            'You must call run() or attach_to_existing_process()')
+        contents = _drone_manager.get_pidfile_contents(
+            self.pidfile_id, use_second_read=use_second_read)
+        if contents.is_invalid():
+            self._state = drone_manager.PidfileContents()
+            raise self._PidfileException(contents)
+        self._state = contents
+
+
+    def _handle_pidfile_error(self, error, message=''):
+        message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
+            self._state.process, self.pidfile_id, message)
+        email_manager.manager.enqueue_notify_email(error, message)
+        self.on_lost_process(self._state.process)
+
+
+    def _get_pidfile_info_helper(self):
+        if self.lost_process:
+            return
+
+        self._read_pidfile()
+
+        if self._state.process is None:
+            self._handle_no_process()
+            return
+
+        if self._state.exit_status is None:
+            # double check whether or not autoserv is running
+            if _drone_manager.is_process_running(self._state.process):
+                return
+
+            # pid but no running process - maybe process *just* exited
+            self._read_pidfile(use_second_read=True)
+            if self._state.exit_status is None:
+                # autoserv exited without writing an exit code
+                # to the pidfile
+                self._handle_pidfile_error(
+                    'autoserv died without writing exit code')
+
+
+    def _get_pidfile_info(self):
+        """\
+        After completion, self._state will contain:
+         pid=None, exit_status=None if autoserv has not yet run
+         pid!=None, exit_status=None if autoserv is running
+         pid!=None, exit_status!=None if autoserv has completed
+        """
+        try:
+            self._get_pidfile_info_helper()
+        except self._PidfileException, exc:
+            self._handle_pidfile_error('Pidfile error', traceback.format_exc())
+
+
+    def _handle_no_process(self):
+        """\
+        Called when no pidfile is found or no pid is in the pidfile.
+        """
+        message = 'No pid found at %s' % self.pidfile_id
+        if time.time() - self._start_time > _get_pidfile_timeout_secs():
+            email_manager.manager.enqueue_notify_email(
+                'Process has failed to write pidfile', message)
+            self.on_lost_process()
+
+
+    def on_lost_process(self, process=None):
+        """\
+        Called when autoserv has exited without writing an exit status,
+        or we've timed out waiting for autoserv to write a pid to the
+        pidfile.  In either case, we just return failure and the caller
+        should signal some kind of warning.
+
+        process is unimportant here, as it shouldn't be used by anyone.
+        """
+        self.lost_process = True
+        self._state.process = process
+        self._state.exit_status = 1
+        self._state.num_tests_failed = 0
+
+
+    def exit_code(self):
+        self._get_pidfile_info()
+        return self._state.exit_status
+
+
+    def num_tests_failed(self):
+        """@returns The number of tests that failed or -1 if unknown."""
+        self._get_pidfile_info()
+        if self._state.num_tests_failed is None:
+            return -1
+        return self._state.num_tests_failed
+
+
+    def try_copy_results_on_drone(self, **kwargs):
+        if self.has_process():
+            # copy results logs into the normal place for job results
+            _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
+
+
+    def try_copy_to_results_repository(self, source, **kwargs):
+        if self.has_process():
+            _drone_manager.copy_to_results_repository(self.get_process(),
+                                                      source, **kwargs)
+
diff --git a/scheduler/postjob_task.py b/scheduler/postjob_task.py
new file mode 100644
index 0000000..4382093
--- /dev/null
+++ b/scheduler/postjob_task.py
@@ -0,0 +1,390 @@
+#pylint: disable-msg=C0111
+
+"""
+Postjob task.
+
+Postjob tasks are responsible for setting the final status of the HQE
+and Host, and scheduling additional special agents such as cleanup,
+if necessary.
+"""
+
+import os
+
+from autotest_lib.frontend.afe import models, model_attributes
+from autotest_lib.scheduler import agent_task, drones, drone_manager
+from autotest_lib.scheduler import email_manager, pidfile_monitor
+from autotest_lib.scheduler import scheduler_config
+from autotest_lib.server import autoserv_utils
+from autotest_lib.site_utils.graphite import stats
+
+
+_drone_manager = drone_manager.instance()
+_parser_path = autoserv_utils._parser_path_func(
+                autoserv_utils.AUTOTEST_INSTALL_DIR)
+
+
+class PostJobTask(agent_task.AgentTask):
+    def __init__(self, queue_entries, log_file_name):
+        super(PostJobTask, self).__init__(log_file_name=log_file_name)
+
+        self.queue_entries = queue_entries
+
+        self._autoserv_monitor = pidfile_monitor.PidfileRunMonitor()
+        self._autoserv_monitor.attach_to_existing_process(
+                self._working_directory())
+
+
+    def _command_line(self):
+        # Do we need testing_mode?
+        return self._generate_command(
+                _drone_manager.absolute_path(self._working_directory()))
+
+
+    def _generate_command(self, results_dir):
+        raise NotImplementedError('Subclasses must override this')
+
+
+    @property
+    def owner_username(self):
+        return self.queue_entries[0].job.owner
+
+
+    def _working_directory(self):
+        return self._get_consistent_execution_path(self.queue_entries)
+
+
+    def _paired_with_monitor(self):
+        return self._autoserv_monitor
+
+
+    def _job_was_aborted(self):
+        was_aborted = None
+        for queue_entry in self.queue_entries:
+            queue_entry.update_from_database()
+            if was_aborted is None: # first queue entry
+                was_aborted = bool(queue_entry.aborted)
+            elif was_aborted != bool(queue_entry.aborted): # subsequent entries
+                entries = ['%s (aborted: %s)' % (entry, entry.aborted)
+                           for entry in self.queue_entries]
+                email_manager.manager.enqueue_notify_email(
+                        'Inconsistent abort state',
+                        'Queue entries have inconsistent abort state:\n' +
+                        '\n'.join(entries))
+                # don't crash here, just assume true
+                return True
+        return was_aborted
+
+
+    def _final_status(self):
+        if self._job_was_aborted():
+            return models.HostQueueEntry.Status.ABORTED
+
+        # we'll use a PidfileRunMonitor to read the autoserv exit status
+        if self._autoserv_monitor.exit_code() == 0:
+            return models.HostQueueEntry.Status.COMPLETED
+        return models.HostQueueEntry.Status.FAILED
+
+
+    def _set_all_statuses(self, status):
+        for queue_entry in self.queue_entries:
+            queue_entry.set_status(status)
+
+
+    def abort(self):
+        # override AgentTask.abort() to avoid killing the process and ending
+        # the task.  post-job tasks continue when the job is aborted.
+        pass
+
+
+    def _pidfile_label(self):
+        # '.autoserv_execute' -> 'autoserv'
+        return self._pidfile_name()[1:-len('_execute')]
+
+
+class SelfThrottledPostJobTask(PostJobTask):
+    """
+    PostJobTask that maintains its own process limit.
+
+    We throttle tasks like parsing because we don't want them to
+    hold up tests. At the same time we don't wish to build up load
+    that will take forever to parse.
+    """
+    _num_running_processes = 0
+    # Last known limit of max processes, used to check whether
+    # max processes config has been changed.
+    _last_known_max_processes = 0
+    # Whether an email should be sent to notifiy process limit being hit.
+    _notification_on = True
+    # Once process limit is hit, an email will be sent.
+    # To prevent spams, do not send another email until
+    # it drops to lower than the following level.
+    REVIVE_NOTIFICATION_THRESHOLD = 0.80
+
+
+    @classmethod
+    def _increment_running_processes(cls):
+        cls._num_running_processes += 1
+        stats.Gauge('scheduler').send('%s.num_running_processes' % cls.__name__,
+                                      cls._num_running_processes)
+
+
+    @classmethod
+    def _decrement_running_processes(cls):
+        cls._num_running_processes -= 1
+        stats.Gauge('scheduler').send('%s.num_running_processes' % cls.__name__,
+                                      cls._num_running_processes)
+
+
+    @classmethod
+    def _max_processes(cls):
+        raise NotImplementedError
+
+
+    @classmethod
+    def _can_run_new_process(cls):
+        return cls._num_running_processes < cls._max_processes()
+
+
+    def _process_started(self):
+        return bool(self.monitor)
+
+
+    def tick(self):
+        # override tick to keep trying to start until the process count goes
+        # down and we can, at which point we revert to default behavior
+        if self._process_started():
+            super(SelfThrottledPostJobTask, self).tick()
+        else:
+            self._try_starting_process()
+
+
+    def run(self):
+        # override run() to not actually run unless we can
+        self._try_starting_process()
+
+
+    @classmethod
+    def _notify_process_limit_hit(cls):
+        """Send an email to notify that process limit is hit."""
+        if cls._notification_on:
+            subject = '%s: hitting max process limit.' % cls.__name__
+            message = ('Running processes/Max processes: %d/%d'
+                       % (cls._num_running_processes, cls._max_processes()))
+            email_manager.manager.enqueue_notify_email(subject, message)
+            cls._notification_on = False
+
+
+    @classmethod
+    def _reset_notification_switch_if_necessary(cls):
+        """Reset _notification_on if necessary.
+
+        Set _notification_on to True on the following cases:
+        1) If the limit of max processes configuration changes;
+        2) If _notification_on is False and the number of running processes
+           drops to lower than a level defined in REVIVE_NOTIFICATION_THRESHOLD.
+
+        """
+        if cls._last_known_max_processes != cls._max_processes():
+            cls._notification_on = True
+            cls._last_known_max_processes = cls._max_processes()
+            return
+        percentage = float(cls._num_running_processes) / cls._max_processes()
+        if (not cls._notification_on and
+            percentage < cls.REVIVE_NOTIFICATION_THRESHOLD):
+            cls._notification_on = True
+
+
+    def _try_starting_process(self):
+        self._reset_notification_switch_if_necessary()
+        if not self._can_run_new_process():
+            self._notify_process_limit_hit()
+            return
+
+        # actually run the command
+        super(SelfThrottledPostJobTask, self).run()
+        if self._process_started():
+            self._increment_running_processes()
+
+
+    def finished(self, success):
+        super(SelfThrottledPostJobTask, self).finished(success)
+        if self._process_started():
+            self._decrement_running_processes()
+
+
+class GatherLogsTask(PostJobTask):
+    """
+    Task responsible for
+    * gathering uncollected logs (if Autoserv crashed hard or was killed)
+    * copying logs to the results repository
+    * spawning CleanupTasks for hosts, if necessary
+    * spawning a FinalReparseTask for the job
+    * setting the final status of the host, directly or through a cleanup
+    """
+    def __init__(self, queue_entries, recover_run_monitor=None):
+        self._job = queue_entries[0].job
+        super(GatherLogsTask, self).__init__(
+            queue_entries, log_file_name='.collect_crashinfo.log')
+        self._set_ids(queue_entries=queue_entries)
+
+
+    # TODO: Refactor into autoserv_utils. crbug.com/243090
+    def _generate_command(self, results_dir):
+        host_list = ','.join(queue_entry.host.hostname
+                             for queue_entry in self.queue_entries)
+        return [autoserv_utils.autoserv_path , '-p',
+                '--pidfile-label=%s' % self._pidfile_label(),
+                '--use-existing-results', '--collect-crashinfo',
+                '-m', host_list, '-r', results_dir]
+
+
+    @property
+    def num_processes(self):
+        return len(self.queue_entries)
+
+
+    def _pidfile_name(self):
+        return drone_manager.CRASHINFO_PID_FILE
+
+
+    def prolog(self):
+        self._check_queue_entry_statuses(
+                self.queue_entries,
+                allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
+                allowed_host_statuses=(models.Host.Status.RUNNING,))
+
+        super(GatherLogsTask, self).prolog()
+
+
+    def epilog(self):
+        super(GatherLogsTask, self).epilog()
+        self._parse_results(self.queue_entries)
+        self._reboot_hosts()
+
+
+    def _reboot_hosts(self):
+        if self._autoserv_monitor.has_process():
+            final_success = (self._final_status() ==
+                             models.HostQueueEntry.Status.COMPLETED)
+            num_tests_failed = self._autoserv_monitor.num_tests_failed()
+        else:
+            final_success = False
+            num_tests_failed = 0
+        reboot_after = self._job.reboot_after
+        do_reboot = (
+                # always reboot after aborted jobs
+                self._final_status() == models.HostQueueEntry.Status.ABORTED
+                or reboot_after == model_attributes.RebootAfter.ALWAYS
+                or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
+                    and final_success and num_tests_failed == 0)
+                or num_tests_failed > 0)
+
+        for queue_entry in self.queue_entries:
+            if do_reboot:
+                # don't pass the queue entry to the CleanupTask. if the cleanup
+                # fails, the job doesn't care -- it's over.
+                models.SpecialTask.objects.create(
+                        host=models.Host.objects.get(id=queue_entry.host.id),
+                        task=models.SpecialTask.Task.CLEANUP,
+                        requested_by=self._job.owner_model())
+            else:
+                queue_entry.host.set_status(models.Host.Status.READY)
+
+
+    def run(self):
+        autoserv_exit_code = self._autoserv_monitor.exit_code()
+        # only run if Autoserv exited due to some signal. if we have no exit
+        # code, assume something bad (and signal-like) happened.
+        if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
+            super(GatherLogsTask, self).run()
+        else:
+            self.finished(True)
+
+
+class FinalReparseTask(SelfThrottledPostJobTask):
+    def __init__(self, queue_entries):
+        super(FinalReparseTask, self).__init__(queue_entries,
+                                               log_file_name='.parse.log')
+        # don't use _set_ids, since we don't want to set the host_ids
+        self.queue_entry_ids = [entry.id for entry in queue_entries]
+
+
+    def _generate_command(self, results_dir):
+        return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
+                results_dir]
+
+
+    @property
+    def num_processes(self):
+        return 0 # don't include parser processes in accounting
+
+
+    def _pidfile_name(self):
+        return drone_manager.PARSER_PID_FILE
+
+
+    @classmethod
+    def _max_processes(cls):
+        return scheduler_config.config.max_parse_processes
+
+
+    def prolog(self):
+        self._check_queue_entry_statuses(
+                self.queue_entries,
+                allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
+
+        super(FinalReparseTask, self).prolog()
+
+
+    def epilog(self):
+        super(FinalReparseTask, self).epilog()
+        self._archive_results(self.queue_entries)
+
+
+class ArchiveResultsTask(SelfThrottledPostJobTask):
+    _ARCHIVING_FAILED_FILE = '.archiver_failed'
+
+    def __init__(self, queue_entries):
+        super(ArchiveResultsTask, self).__init__(queue_entries,
+                                                 log_file_name='.archiving.log')
+        # don't use _set_ids, since we don't want to set the host_ids
+        self.queue_entry_ids = [entry.id for entry in queue_entries]
+
+
+    def _pidfile_name(self):
+        return drone_manager.ARCHIVER_PID_FILE
+
+
+    # TODO: Refactor into autoserv_utils. crbug.com/243090
+    def _generate_command(self, results_dir):
+        return [autoserv_utils.autoserv_path , '-p',
+                '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
+                '--use-existing-results', '--control-filename=control.archive',
+                os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
+                             'archive_results.control.srv')]
+
+
+    @classmethod
+    def _max_processes(cls):
+        return scheduler_config.config.max_transfer_processes
+
+
+    def prolog(self):
+        self._check_queue_entry_statuses(
+                self.queue_entries,
+                allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
+
+        super(ArchiveResultsTask, self).prolog()
+
+
+    def epilog(self):
+        super(ArchiveResultsTask, self).epilog()
+        if not self.success and self._paired_with_monitor().has_process():
+            failed_file = os.path.join(self._working_directory(),
+                                       self._ARCHIVING_FAILED_FILE)
+            paired_process = self._paired_with_monitor().get_process()
+            _drone_manager.write_lines_to_file(
+                    failed_file, ['Archiving failed with exit code %s'
+                                  % self.monitor.exit_code()],
+                    paired_with_process=paired_process)
+        self._set_all_statuses(self._final_status())
diff --git a/scheduler/prejob_task.py b/scheduler/prejob_task.py
new file mode 100644
index 0000000..37a5fff
--- /dev/null
+++ b/scheduler/prejob_task.py
@@ -0,0 +1,368 @@
+#pylint: disable-msg=C0111
+
+"""
+Prejob tasks.
+
+Prejob tasks _usually_ run before a job and verify the state of a machine.
+Cleanup and repair are exceptions, cleanup can run after a job too, while
+repair will run anytime the host needs a repair, which could be pre or post
+job. Most of the work specific to this module is achieved through the prolog
+and epilog of each task.
+
+All prejob tasks must have a host, though they may not have an HQE. If a
+prejob task has a hqe, it will activate the hqe through its on_pending
+method on successfull completion. A row in afe_special_tasks with values:
+    host=C1, unlocked, is_active=0, is_complete=0, type=Verify
+will indicate to the scheduler that it needs to schedule a new special task
+of type=Verify, against the C1 host. While the special task is running
+the scheduler only monitors it through the Agent, and its is_active bit=1.
+Once a special task finishes, we set its is_active=0, is_complete=1 and
+success bits, so the scheduler ignores it.
+HQE.on_pending:
+    Host, HQE -> Pending, Starting
+    This status is acted upon in the scheduler, to assign an AgentTask.
+PreJobTask:
+    epilog:
+        failure:
+            requeue hqe
+            repair the host
+Children PreJobTasks:
+    prolog:
+        set Host, HQE status
+    epilog:
+        success:
+            on_pending
+        failure:
+            repair throgh PreJobTask
+            set Host, HQE status
+"""
+import logging
+import os
+
+from autotest_lib.client.common_lib import host_protections
+from autotest_lib.frontend.afe import models
+from autotest_lib.scheduler import agent_task, scheduler_config
+from autotest_lib.server import autoserv_utils
+from autotest_lib.server.cros import provision
+
+
+class PreJobTask(agent_task.SpecialAgentTask):
+    def _copy_to_results_repository(self):
+        if not self.queue_entry or self.queue_entry.meta_host:
+            return
+
+        self.queue_entry.set_execution_subdir()
+        log_name = os.path.basename(self.task.execution_path())
+        source = os.path.join(self.task.execution_path(), 'debug',
+                              'autoserv.DEBUG')
+        destination = os.path.join(
+                self.queue_entry.execution_path(), log_name)
+
+        self.monitor.try_copy_to_results_repository(
+                source, destination_path=destination)
+
+
+    def epilog(self):
+        super(PreJobTask, self).epilog()
+
+        if self.success:
+            return
+
+        if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
+            # effectively ignore failure for these hosts
+            self.success = True
+            return
+
+        if self.queue_entry:
+            # If we requeue a HQE, we should cancel any remaining pre-job
+            # tasks against this host, otherwise we'll be left in a state
+            # where a queued HQE has special tasks to run against a host.
+            models.SpecialTask.objects.filter(
+                    queue_entry__id=self.queue_entry.id,
+                    host__id=self.host.id,
+                    is_complete=0).update(is_complete=1, success=0)
+
+            previous_provisions = models.SpecialTask.objects.filter(
+                    task=models.SpecialTask.Task.PROVISION,
+                    queue_entry_id=self.queue_entry.id).count()
+            if (previous_provisions >
+                scheduler_config.config.max_provision_retries):
+                self._actually_fail_queue_entry()
+                # This abort will mark the aborted bit on the HQE itself, to
+                # signify that we're killing it.  Technically it also will do
+                # the recursive aborting of all child jobs, but that shouldn't
+                # matter here, as only suites have children, and those are
+                # hostless and thus don't have provisioning.
+                # TODO(milleral) http://crbug.com/188217
+                # However, we can't actually do this yet, as if we set the
+                # abort bit the FinalReparseTask will set the status of the HQE
+                # to ABORTED, which then means that we don't show the status in
+                # run_suite.  So in the meantime, don't mark the HQE as
+                # aborted.
+                # queue_entry.abort()
+            else:
+                # requeue() must come after handling provision retries, since
+                # _actually_fail_queue_entry needs an execution subdir.
+                # We also don't want to requeue if we hit the provision retry
+                # limit, since then we overwrite the PARSING state of the HQE.
+                self.queue_entry.requeue()
+
+            previous_repairs = models.SpecialTask.objects.filter(
+                    task=models.SpecialTask.Task.REPAIR,
+                    queue_entry_id=self.queue_entry.id).count()
+            if previous_repairs >= scheduler_config.config.max_repair_limit:
+                self.host.set_status(models.Host.Status.REPAIR_FAILED)
+                self._fail_queue_entry()
+                return
+
+            queue_entry = models.HostQueueEntry.objects.get(
+                    id=self.queue_entry.id)
+        else:
+            queue_entry = None
+
+        models.SpecialTask.objects.create(
+                host=models.Host.objects.get(id=self.host.id),
+                task=models.SpecialTask.Task.REPAIR,
+                queue_entry=queue_entry,
+                requested_by=self.task.requested_by)
+
+
+    def _should_pending(self):
+        """
+        Decide if we should call the host queue entry's on_pending method.
+        We should if:
+        1) There exists an associated host queue entry.
+        2) The current special task completed successfully.
+        3) There do not exist any more special tasks to be run before the
+           host queue entry starts.
+
+        @returns: True if we should call pending, false if not.
+
+        """
+        if not self.queue_entry or not self.success:
+            return False
+
+        # We know if this is the last one when we create it, so we could add
+        # another column to the database to keep track of this information, but
+        # I expect the overhead of querying here to be minimal.
+        queue_entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
+        queued = models.SpecialTask.objects.filter(
+                host__id=self.host.id, is_active=False,
+                is_complete=False, queue_entry=queue_entry)
+        queued = queued.exclude(id=self.task.id)
+        return queued.count() == 0
+
+
+class VerifyTask(PreJobTask):
+    TASK_TYPE = models.SpecialTask.Task.VERIFY
+
+
+    def __init__(self, task):
+        super(VerifyTask, self).__init__(task, ['-v'])
+        self._set_ids(host=self.host, queue_entries=[self.queue_entry])
+
+
+    def prolog(self):
+        super(VerifyTask, self).prolog()
+
+        logging.info("starting verify on %s", self.host.hostname)
+        if self.queue_entry:
+            self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
+        self.host.set_status(models.Host.Status.VERIFYING)
+
+        # Delete any queued manual reverifies for this host.  One verify will do
+        # and there's no need to keep records of other requests.
+        self.remove_special_tasks(models.SpecialTask.Task.VERIFY,
+                                  keep_last_one=True)
+
+
+    def epilog(self):
+        super(VerifyTask, self).epilog()
+        if self.success:
+            if self._should_pending():
+                self.queue_entry.on_pending()
+            else:
+                self.host.set_status(models.Host.Status.READY)
+
+
+class CleanupTask(PreJobTask):
+    # note this can also run post-job, but when it does, it's running standalone
+    # against the host (not related to the job), so it's not considered a
+    # PostJobTask
+
+    TASK_TYPE = models.SpecialTask.Task.CLEANUP
+
+
+    def __init__(self, task, recover_run_monitor=None):
+        super(CleanupTask, self).__init__(task, ['--cleanup'])
+        self._set_ids(host=self.host, queue_entries=[self.queue_entry])
+
+
+    def prolog(self):
+        super(CleanupTask, self).prolog()
+        logging.info("starting cleanup task for host: %s", self.host.hostname)
+        self.host.set_status(models.Host.Status.CLEANING)
+        if self.queue_entry:
+            self.queue_entry.set_status(models.HostQueueEntry.Status.CLEANING)
+
+
+    def _finish_epilog(self):
+        if not self.queue_entry or not self.success:
+            return
+
+        do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
+        should_run_verify = (
+                self.queue_entry.job.run_verify
+                and self.host.protection != do_not_verify_protection)
+        if should_run_verify:
+            entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
+            models.SpecialTask.objects.create(
+                    host=models.Host.objects.get(id=self.host.id),
+                    queue_entry=entry,
+                    task=models.SpecialTask.Task.VERIFY)
+        else:
+            if self._should_pending():
+                self.queue_entry.on_pending()
+
+
+    def epilog(self):
+        super(CleanupTask, self).epilog()
+
+        if self.success:
+            self.host.update_field('dirty', 0)
+            self.host.set_status(models.Host.Status.READY)
+
+        self._finish_epilog()
+
+
+class ResetTask(PreJobTask):
+    """Task to reset a DUT, including cleanup and verify."""
+    # note this can also run post-job, but when it does, it's running standalone
+    # against the host (not related to the job), so it's not considered a
+    # PostJobTask
+
+    TASK_TYPE = models.SpecialTask.Task.RESET
+
+
+    def __init__(self, task, recover_run_monitor=None):
+        super(ResetTask, self).__init__(task, ['--reset'])
+        self._set_ids(host=self.host, queue_entries=[self.queue_entry])
+
+
+    def prolog(self):
+        super(ResetTask, self).prolog()
+        logging.info('starting reset task for host: %s',
+                     self.host.hostname)
+        self.host.set_status(models.Host.Status.RESETTING)
+        if self.queue_entry:
+            self.queue_entry.set_status(models.HostQueueEntry.Status.RESETTING)
+
+        # Delete any queued cleanups for this host.
+        self.remove_special_tasks(models.SpecialTask.Task.CLEANUP,
+                                  keep_last_one=False)
+
+        # Delete any queued reverifies for this host.
+        self.remove_special_tasks(models.SpecialTask.Task.VERIFY,
+                                  keep_last_one=False)
+
+        # Only one reset is needed.
+        self.remove_special_tasks(models.SpecialTask.Task.RESET,
+                                  keep_last_one=True)
+
+
+    def epilog(self):
+        super(ResetTask, self).epilog()
+
+        if self.success:
+            self.host.update_field('dirty', 0)
+
+            if self._should_pending():
+                self.queue_entry.on_pending()
+            else:
+                self.host.set_status(models.Host.Status.READY)
+
+
+class ProvisionTask(PreJobTask):
+    TASK_TYPE = models.SpecialTask.Task.PROVISION
+
+    def __init__(self, task):
+        # Provisioning requires that we be associated with a job/queue entry
+        assert task.queue_entry, "No HQE associated with provision task!"
+        # task.queue_entry is an afe model HostQueueEntry object.
+        # self.queue_entry is a scheduler models HostQueueEntry object, but
+        # it gets constructed and assigned in __init__, so it's not available
+        # yet.  Therefore, we're stuck pulling labels off of the afe model
+        # so that we can pass the --provision args into the __init__ call.
+        labels = {x.name for x in task.queue_entry.job.dependency_labels.all()}
+        _, provisionable = provision.filter_labels(labels)
+        extra_command_args = ['--provision', ','.join(provisionable)]
+        super(ProvisionTask, self).__init__(task, extra_command_args)
+        self._set_ids(host=self.host, queue_entries=[self.queue_entry])
+
+
+    def _command_line(self):
+        # If we give queue_entry to _autoserv_command_line, then it will append
+        # -c for this invocation if the queue_entry is a client side test. We
+        # don't want that, as it messes with provisioning, so we just drop it
+        # from the arguments here.
+        # Note that we also don't verify job_repo_url as provisioining tasks are
+        # required to stage whatever content we need, and the job itself will
+        # force autotest to be staged if it isn't already.
+        return autoserv_utils._autoserv_command_line(self.host.hostname,
+                                                     self._extra_command_args)
+
+
+    def prolog(self):
+        super(ProvisionTask, self).prolog()
+        # add check for previous provision task and abort if exist.
+        logging.info("starting provision task for host: %s", self.host.hostname)
+        self.queue_entry.set_status(
+                models.HostQueueEntry.Status.PROVISIONING)
+        self.host.set_status(models.Host.Status.PROVISIONING)
+
+
+    def epilog(self):
+        super(ProvisionTask, self).epilog()
+
+        if self._should_pending():
+            self.queue_entry.on_pending()
+        else:
+            self.host.set_status(models.Host.Status.READY)
+
+
+class RepairTask(agent_task.SpecialAgentTask):
+    TASK_TYPE = models.SpecialTask.Task.REPAIR
+
+
+    def __init__(self, task):
+        """\
+        queue_entry: queue entry to mark failed if this repair fails.
+        """
+        protection = host_protections.Protection.get_string(
+                task.host.protection)
+        # normalize the protection name
+        protection = host_protections.Protection.get_attr_name(protection)
+
+        super(RepairTask, self).__init__(
+                task, ['-R', '--host-protection', protection])
+
+        # *don't* include the queue entry in IDs -- if the queue entry is
+        # aborted, we want to leave the repair task running
+        self._set_ids(host=self.host)
+
+
+    def prolog(self):
+        super(RepairTask, self).prolog()
+        logging.info("repair_task starting")
+        self.host.set_status(models.Host.Status.REPAIRING)
+
+
+    def epilog(self):
+        super(RepairTask, self).epilog()
+
+        if self.success:
+            self.host.set_status(models.Host.Status.READY)
+        else:
+            self.host.set_status(models.Host.Status.REPAIR_FAILED)
+            if self.queue_entry:
+                self._fail_queue_entry()