Add support to the scheduler to run autoserv --collect_crashinfo after a job finishes or is aborted.
* added new state "Gathering" for when we're running collect_crashinfo and copying logs to the results repository
* added new GatherLogsTask to the scheduler to perform these two tasks, and made it get run either after a job finishes or after a job is aborted.  this task shares a lot with FinalReparseTask, so extracted common code into a new PostJobTask.
* made changes to scheduler/drone code to support generic monitoring and recovery of processes via pidfiles, since we need to be able to recover the collect_crashinfo processes too.  this will also made the scheduler recover parse processes instead of just killing them as it does now, which is nice.
* changed abort logic significantly.  since we now need to put aborted jobs through the gathering and parsing stages, but then know to put them into "aborted" afterwards, we can't depend on the old path of abort -> aborting -> aborted statuses.  instead, we need to add an "aborted" flag to the HQE DB table and use that.  this actually makes things generally cleaner in my opinion -- for one,  we can get rid of the "Abort" and "Aborting" statuses altogether.  added a migration to add this flag, edited model and relevant logic appropriately, including changing how job statuses are reported for aborted entries.

Signed-off-by: Steve Howard <showard@google.com>


git-svn-id: http://test.kernel.org/svn/autotest/trunk@3031 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/frontend/afe/doctests/001_rpc_test.txt b/frontend/afe/doctests/001_rpc_test.txt
index d94f426..5e59fdc 100644
--- a/frontend/afe/doctests/001_rpc_test.txt
+++ b/frontend/afe/doctests/001_rpc_test.txt
@@ -481,7 +481,9 @@
 ...  'status': 'Queued',
 ...  'deleted': 0,
 ...  'execution_subdir': '',
-...  'atomic_group': None})
+...  'atomic_group': None,
+...  'aborted': False,
+...  'full_status': 'Queued'})
 True
 >>> data[2] == (
 ... {'active': 0,
@@ -493,7 +495,9 @@
 ...  'status': 'Queued',
 ...  'deleted': 0,
 ...  'execution_subdir': '',
-...  'atomic_group': None})
+...  'atomic_group': None,
+...  'aborted': False,
+...  'full_status': 'Queued'})
 True
 >>> rpc_interface.get_num_host_queue_entries(job=1)
 4
@@ -510,7 +514,7 @@
 >>> rpc_interface.abort_host_queue_entries(job__id=1)
 >>> data = rpc_interface.get_jobs_summary(id=1)
 >>> data[0]['status_counts']
-{'Abort': 4}
+{'Aborted (Queued)': 4}
 
 # Remove the two hosts in my_label
 >>> rpc_interface.delete_host('my_label_host1')
diff --git a/frontend/afe/model_logic.py b/frontend/afe/model_logic.py
index 509f960..81ad9a8 100644
--- a/frontend/afe/model_logic.py
+++ b/frontend/afe/model_logic.py
@@ -565,9 +565,15 @@
         object_dict = dict((field_name, getattr(self, field_name))
                            for field_name in fields)
         self.clean_object_dicts([object_dict])
+        self._postprocess_object_dict(object_dict)
         return object_dict
 
 
+    def _postprocess_object_dict(self, object_dict):
+        """For subclasses to override."""
+        pass
+
+
     @classmethod
     def get_valid_manager(cls):
         return cls.objects
diff --git a/frontend/afe/models.py b/frontend/afe/models.py
index 787793b..c53eb42 100644
--- a/frontend/afe/models.py
+++ b/frontend/afe/models.py
@@ -583,16 +583,18 @@
         id_list = '(%s)' % ','.join(str(job_id) for job_id in job_ids)
         cursor = connection.cursor()
         cursor.execute("""
-            SELECT job_id, status, COUNT(*)
+            SELECT job_id, status, aborted, complete, COUNT(*)
             FROM host_queue_entries
             WHERE job_id IN %s
-            GROUP BY job_id, status
+            GROUP BY job_id, status, aborted, complete
             """ % id_list)
         all_job_counts = {}
         for job_id in job_ids:
             all_job_counts[job_id] = {}
-        for job_id, status, count in cursor.fetchall():
-            all_job_counts[job_id][status] = count
+        for job_id, status, aborted, complete, count in cursor.fetchall():
+            full_status = HostQueueEntry.compute_full_status(status, aborted,
+                                                             complete)
+            all_job_counts[job_id][full_status] = count
         return all_job_counts
 
 
@@ -744,11 +746,10 @@
 
 class HostQueueEntry(dbmodels.Model, model_logic.ModelExtensions):
     Status = enum.Enum('Queued', 'Starting', 'Verifying', 'Pending', 'Running',
-                       'Parsing', 'Abort', 'Aborting', 'Aborted', 'Completed',
+                       'Gathering', 'Parsing', 'Aborted', 'Completed',
                        'Failed', 'Stopped', string_values=True)
-    ABORT_STATUSES = (Status.ABORT, Status.ABORTING, Status.ABORTED)
     ACTIVE_STATUSES = (Status.STARTING, Status.VERIFYING, Status.PENDING,
-                       Status.RUNNING, Status.ABORTING)
+                       Status.RUNNING, Status.GATHERING)
     COMPLETE_STATUSES = (Status.ABORTED, Status.COMPLETED, Status.FAILED,
                          Status.STOPPED)
 
@@ -764,6 +765,7 @@
     # If atomic_group is set, this is a virtual HostQueueEntry that will
     # be expanded into many actual hosts within the group at schedule time.
     atomic_group = dbmodels.ForeignKey(AtomicGroup, blank=True, null=True)
+    aborted = dbmodels.BooleanField(default=False)
 
     objects = model_logic.ExtendedManager()
 
@@ -788,11 +790,7 @@
 
 
     def _set_active_and_complete(self):
-        if self.status == self.Status.ABORT:
-            # must leave active flag unchanged so scheduler knows if entry was
-            # active before abort.
-            return
-        elif self.status in self.ACTIVE_STATUSES:
+        if self.status in self.ACTIVE_STATUSES:
             self.active, self.complete = True, False
         elif self.status in self.COMPLETE_STATUSES:
             self.active, self.complete = False, True
@@ -822,11 +820,28 @@
     def abort(self, user):
         # this isn't completely immune to race conditions since it's not atomic,
         # but it should be safe given the scheduler's behavior.
-        if not self.complete and self.status not in self.ABORT_STATUSES:
-            self.status = HostQueueEntry.Status.ABORT
+        if not self.complete and not self.aborted:
             self.log_abort(user)
+            self.aborted = True
             self.save()
 
+
+    @classmethod
+    def compute_full_status(cls, status, aborted, complete):
+        if aborted and not complete:
+            return 'Aborted (%s)' % status
+        return status
+
+
+    def full_status(self):
+        return self.compute_full_status(self.status, self.aborted,
+                                        self.complete)
+
+
+    def _postprocess_object_dict(self, object_dict):
+        object_dict['full_status'] = self.full_status()
+
+
     class Meta:
         db_table = 'host_queue_entries'
 
diff --git a/frontend/afe/rpc_interface.py b/frontend/afe/rpc_interface.py
index abc9786..5ce6f90 100644
--- a/frontend/afe/rpc_interface.py
+++ b/frontend/afe/rpc_interface.py
@@ -677,16 +677,15 @@
     result['reboot_after_options'] = models.RebootAfter.names
     result['motd'] = rpc_utils.get_motd()
 
-    result['status_dictionary'] = {"Abort": "Abort",
-                                   "Aborted": "Aborted",
+    result['status_dictionary'] = {"Aborted": "Aborted",
                                    "Verifying": "Verifying Host",
                                    "Pending": "Waiting on other hosts",
                                    "Running": "Running autoserv",
                                    "Completed": "Autoserv completed",
                                    "Failed": "Failed to complete",
-                                   "Aborting": "Abort in progress",
                                    "Queued": "Queued",
                                    "Starting": "Next in host's queue",
                                    "Stopped": "Other host(s) failed verify",
-                                   "Parsing": "Awaiting parse of final results"}
+                                   "Parsing": "Awaiting parse of final results",
+                                   "Gathering": "Gathering log files"}
     return result
diff --git a/frontend/client/src/autotest/afe/JobDetailView.java b/frontend/client/src/autotest/afe/JobDetailView.java
index 865d10b..07c5c83 100644
--- a/frontend/client/src/autotest/afe/JobDetailView.java
+++ b/frontend/client/src/autotest/afe/JobDetailView.java
@@ -43,7 +43,7 @@
 public class JobDetailView extends DetailView implements TableWidgetFactory, TableActionsListener {
     private static final String[][] JOB_HOSTS_COLUMNS = {
         {DataTable.CLICKABLE_WIDGET_COLUMN, ""}, // selection checkbox 
-        {"hostname", "Host"}, {"status", "Status"}, 
+        {"hostname", "Host"}, {"full_status", "Status"}, 
         {"host_status", "Host Status"}, {"host_locked", "Host Locked"},
         // columns for status log and debug log links
         {DataTable.CLICKABLE_WIDGET_COLUMN, ""}, {DataTable.CLICKABLE_WIDGET_COLUMN, ""}  
@@ -111,7 +111,7 @@
                 JSONObject counts = jobObject.get("status_counts").isObject();
                 String countString = AfeUtils.formatStatusCounts(counts, ", ");
                 showText(countString, "view_status");
-                abortButton.setVisible(!allFinishedCounts(counts));
+                abortButton.setVisible(isAnyEntryAbortable(counts));
                 
                 String jobTag = AfeUtils.getJobTag(jobObject);
                 pointToResults(getResultsURL(jobId), getLogsURL(jobTag), 
@@ -133,18 +133,17 @@
         });
     }
     
-    protected boolean allFinishedCounts(JSONObject statusCounts) {
-        Set<String> keys = statusCounts.keySet();
-        for (String key : keys) {
-            if (!(key.equals("Completed") || 
-                  key.equals("Failed") ||
-                  key.equals("Aborting") ||
-                  key.equals("Abort") ||
-                  key.equals("Aborted") ||
-                  key.equals("Stopped")))
-                return false;
+    protected boolean isAnyEntryAbortable(JSONObject statusCounts) {
+        Set<String> statuses = statusCounts.keySet();
+        for (String status : statuses) {
+            if (!(status.equals("Completed") || 
+                  status.equals("Failed") ||
+                  status.equals("Stopped") ||
+                  status.startsWith("Aborted"))) {
+                return true;
+            }
         }
-        return true;
+        return false;
     }
     
     @Override
diff --git a/frontend/migrations/031_add_hqe_aborted_flag.py b/frontend/migrations/031_add_hqe_aborted_flag.py
new file mode 100644
index 0000000..57c8d0f
--- /dev/null
+++ b/frontend/migrations/031_add_hqe_aborted_flag.py
@@ -0,0 +1,11 @@
+def migrate_up(manager):
+    manager.execute('ALTER TABLE host_queue_entries '
+                    'ADD COLUMN `aborted` bool NOT NULL DEFAULT FALSE')
+    manager.execute("UPDATE host_queue_entries SET aborted = true WHERE "
+                    "status IN ('Abort', 'Aborting', 'Aborted')")
+
+
+def migrate_down(manager):
+    manager.execute("UPDATE host_queue_entries SET status = 'Abort' WHERE "
+                    "aborted AND status != 'Aborted'")
+    manager.execute('ALTER TABLE host_queue_entries DROP COLUMN `aborted`')
diff --git a/scheduler/drone_manager.py b/scheduler/drone_manager.py
index d44165c..026d6be 100644
--- a/scheduler/drone_manager.py
+++ b/scheduler/drone_manager.py
@@ -4,8 +4,6 @@
 from autotest_lib.scheduler import email_manager, drone_utility, drones
 from autotest_lib.scheduler import scheduler_config
 
-_AUTOSERV_PID_FILE = '.autoserv_execute'
-
 
 class DroneManagerError(Exception):
     pass
@@ -324,12 +322,10 @@
 
     def get_orphaned_autoserv_processes(self):
         """
-        Returns a dict mapping execution tags to AutoservProcess objects for
-        orphaned processes only.
+        Returns a set of Process objects for orphaned processes only.
         """
-        return dict((execution_tag, process)
-                    for execution_tag, process in self._processes.iteritems()
-                    if process.ppid == 1)
+        return set(process for process in self._process_set
+                   if process.ppid == 1)
 
 
     def get_process_for(self, execution_tag):
@@ -343,7 +339,7 @@
         """
         Kill the given process.
         """
-        logging.info('killing %s' % process)
+        logging.info('killing %s', process)
         drone = self._get_drone_for_process(process)
         drone.queue_call('kill_process', process)
 
@@ -408,16 +404,15 @@
         return drone_to_use
 
 
-    def execute_command(self, command, working_directory, log_file=None,
-                        pidfile_name=None, paired_with_pidfile=None):
+    def execute_command(self, command, working_directory, pidfile_name,
+                        log_file=None, paired_with_pidfile=None):
         """
         Execute the given command, taken as an argv list.
 
         * working_directory: directory in which the pidfile will be written
+        * pidfile_name: gives the name of the pidfile this process will write
         * log_file (optional): specifies a path (in the results repository) to
           hold command output.
-        * pidfile_name (optional): gives the name of the pidfile this process
-          will write
         * paired_with_pidfile (optional): a PidfileId for an already-executed
           process; the new process will execute on the same drone as the
           previous process.
@@ -426,8 +421,6 @@
         if not log_file:
             log_file = self.get_temporary_path('execute')
         log_file = self.absolute_path(log_file)
-        if not pidfile_name:
-            pidfile_name = _AUTOSERV_PID_FILE
 
         if paired_with_pidfile:
             drone = self._get_drone_for_pidfile_id(paired_with_pidfile)
@@ -447,9 +440,8 @@
         return pidfile_id
 
 
-    def get_pidfile_id_from(self, execution_tag):
-        path = os.path.join(self.absolute_path(execution_tag),
-                            _AUTOSERV_PID_FILE)
+    def get_pidfile_id_from(self, execution_tag, pidfile_name):
+        path = os.path.join(self.absolute_path(execution_tag), pidfile_name)
         return PidfileId(path)
 
 
diff --git a/scheduler/drone_utility.py b/scheduler/drone_utility.py
index 7eba407..b830078 100644
--- a/scheduler/drone_utility.py
+++ b/scheduler/drone_utility.py
@@ -54,9 +54,6 @@
             shutil.rmtree(temporary_directory)
         self._ensure_directory_exists(temporary_directory)
 
-        # make sure there are no old parsers running
-        os.system('killall parse')
-
 
     def _warn(self, warning):
         self.warnings.append(warning)
@@ -96,6 +93,19 @@
 
 
     def refresh(self, pidfile_paths):
+        """
+        pidfile_paths should be a list of paths to check for pidfiles.
+
+        Returns a dict containing:
+        * pidfiles: dict mapping pidfile paths to file contents, for pidfiles
+        that exist.
+        * autoserv_processes: list of dicts corresponding to running autoserv
+        processes.  each dict contain pid, pgid, ppid, comm, and args (see
+        "man ps" for details).
+        * parse_processes: likewise, for parse processes.
+        * pidfiles_second_read: same info as pidfiles, but gathered after the
+        processes are scanned.
+        """
         results = {
             'pidfiles' : self._read_pidfiles(pidfile_paths),
             'autoserv_processes' : self._refresh_processes('autoserv'),
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index b1a023e..b52e9f8 100644
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -37,6 +37,13 @@
 # how long to wait for autoserv to write a pidfile
 PIDFILE_TIMEOUT = 5 * 60 # 5 min
 
+_AUTOSERV_PID_FILE = '.autoserv_execute'
+_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
+_PARSER_PID_FILE = '.parser_execute'
+
+_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
+                      _PARSER_PID_FILE)
+
 # error message to leave in results dir when an autoserv process disappears
 # mysteriously
 _LOST_PROCESS_ERROR = """\
@@ -206,15 +213,6 @@
     sys.stderr = err_fd
 
 
-def queue_entries_to_abort():
-    rows = _db.execute("""
-            SELECT * FROM host_queue_entries WHERE status='Abort';
-                    """)
-
-    qe = [HostQueueEntry(row=i) for i in rows]
-    return qe
-
-
 def _autoserv_command_line(machines, results_dir, extra_args, job=None,
                            queue_entry=None):
     autoserv_argv = [_autoserv_path, '-p', '-m', machines,
@@ -638,7 +636,7 @@
         """
         Find agents corresponding to the specified queue_entry.
         """
-        return self._queue_entry_agents.get(queue_entry.id, set())
+        return list(self._queue_entry_agents.get(queue_entry.id, set()))
 
 
     def host_has_agent(self, host):
@@ -656,34 +654,11 @@
                                        agent.queue_entry_ids, agent)
 
 
-    def num_running_processes(self):
-        return sum(agent.num_processes for agent in self._agents
-                   if agent.is_running())
-
-
-    def _extract_execution_tag(self, command_line):
-        match = re.match(r'.* -P (\S+) ', command_line)
-        if not match:
-            return None
-        return match.group(1)
-
-
-    def _recover_queue_entries(self, queue_entries, run_monitor):
-        assert len(queue_entries) > 0
-        queue_task = RecoveryQueueTask(job=queue_entries[0].job,
-                                       queue_entries=queue_entries,
-                                       run_monitor=run_monitor)
-        self.add_agent(Agent(tasks=[queue_task],
-                             num_processes=len(queue_entries)))
-
-
     def _recover_processes(self):
         self._register_pidfiles()
         _drone_manager.refresh()
-        self._recover_running_entries()
-        self._recover_aborting_entries()
+        self._recover_all_recoverable_entries()
         self._requeue_other_active_entries()
-        self._recover_parsing_entries()
         self._reverify_remaining_hosts()
         # reinitialize drones after killing orphaned processes, since they can
         # leave around files when they die
@@ -695,57 +670,103 @@
         # during recovery we may need to read pidfiles for both running and
         # parsing entries
         queue_entries = HostQueueEntry.fetch(
-            where="status IN ('Running', 'Parsing')")
+            where="status IN ('Running', 'Gathering', 'Parsing')")
         for queue_entry in queue_entries:
-            pidfile_id = _drone_manager.get_pidfile_id_from(
-                queue_entry.execution_tag())
-            _drone_manager.register_pidfile(pidfile_id)
+            for pidfile_name in _ALL_PIDFILE_NAMES:
+                pidfile_id = _drone_manager.get_pidfile_id_from(
+                    queue_entry.execution_tag(), pidfile_name=pidfile_name)
+                _drone_manager.register_pidfile(pidfile_id)
 
 
-    def _recover_running_entries(self):
-        orphans = _drone_manager.get_orphaned_autoserv_processes()
-
-        queue_entries = HostQueueEntry.fetch(where="status = 'Running'")
-        requeue_entries = []
+    def _recover_entries_with_status(self, status, orphans, pidfile_name,
+                                     recover_entries_fn):
+        queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
         for queue_entry in queue_entries:
             if self.get_agents_for_entry(queue_entry):
                 # synchronous job we've already recovered
                 continue
+            queue_entries = queue_entry.job.get_group_entries(queue_entry)
             execution_tag = queue_entry.execution_tag()
             run_monitor = PidfileRunMonitor()
-            run_monitor.attach_to_existing_process(execution_tag)
+            run_monitor.attach_to_existing_process(execution_tag,
+                                                   pidfile_name=pidfile_name)
             if not run_monitor.has_process():
-                # autoserv apparently never got run, so let it get requeued
+                # execution apparently never happened
+                recover_entries_fn(queue_entry.job, queue_entries, None)
                 continue
-            queue_entries = queue_entry.job.get_group_entries(queue_entry)
-            logging.info('Recovering %s (process %s)', 
-                         (', '.join(str(entry) for entry in queue_entries),
-                         run_monitor.get_process()))
-            self._recover_queue_entries(queue_entries, run_monitor)
-            orphans.pop(execution_tag, None)
 
-        # now kill any remaining autoserv processes
-        for process in orphans.itervalues():
+            logging.info('Recovering %s entry %s (process %s)',
+                         status.lower(),
+                         ', '.join(str(entry) for entry in queue_entries),
+                         run_monitor.get_process())
+            recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
+            orphans.discard(run_monitor.get_process())
+
+
+    def _kill_remaining_orphan_processes(self, orphans):
+        for process in orphans:
             logging.info('Killing orphan %s', process)
             _drone_manager.kill_process(process)
 
 
-    def _recover_aborting_entries(self):
-        queue_entries = HostQueueEntry.fetch(
-            where='status IN ("Abort", "Aborting")')
-        for queue_entry in queue_entries:
-            logging.info('Recovering aborting QE %s', queue_entry)
-            agent = queue_entry.abort(self)
+    def _recover_running_entries(self, orphans):
+        def recover_entries(job, queue_entries, run_monitor):
+            if run_monitor is not None:
+                queue_task = RecoveryQueueTask(job=job,
+                                               queue_entries=queue_entries,
+                                               run_monitor=run_monitor)
+                self.add_agent(Agent(tasks=[queue_task],
+                                     num_processes=len(queue_entries)))
+            # else, _requeue_other_active_entries will cover this
+
+        self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
+                                          orphans, '.autoserv_execute',
+                                          recover_entries)
+
+
+    def _recover_gathering_entries(self, orphans):
+        def recover_entries(job, queue_entries, run_monitor):
+            gather_task = GatherLogsTask(job, queue_entries,
+                                         run_monitor=run_monitor)
+            self.add_agent(Agent([gather_task]))
+
+        self._recover_entries_with_status(
+            models.HostQueueEntry.Status.GATHERING,
+            orphans, _CRASHINFO_PID_FILE, recover_entries)
+
+
+    def _recover_parsing_entries(self, orphans):
+        def recover_entries(job, queue_entries, run_monitor):
+            reparse_task = FinalReparseTask(queue_entries,
+                                            run_monitor=run_monitor)
+            self.add_agent(Agent([reparse_task], num_processes=0))
+
+        self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
+                                          orphans, _PARSER_PID_FILE,
+                                          recover_entries)
+
+
+    def _recover_all_recoverable_entries(self):
+        orphans = _drone_manager.get_orphaned_autoserv_processes()
+        self._recover_running_entries(orphans)
+        self._recover_gathering_entries(orphans)
+        self._recover_parsing_entries(orphans)
+        self._kill_remaining_orphan_processes(orphans)
 
 
     def _requeue_other_active_entries(self):
         queue_entries = HostQueueEntry.fetch(
-            where='active AND NOT complete AND status != "Pending"')
+            where='active AND NOT complete AND '
+                  '(aborted OR status != "Pending")')
         for queue_entry in queue_entries:
             if self.get_agents_for_entry(queue_entry):
                 # entry has already been recovered
                 continue
-            logging.info('Requeuing active QE %s (status=%s)', queue_entry, 
+            if queue_entry.aborted:
+                queue_entry.abort(self)
+                continue
+
+            logging.info('Requeuing active QE %s (status=%s)', queue_entry,
                          queue_entry.status)
             if queue_entry.host:
                 tasks = queue_entry.host.reverify_tasks()
@@ -783,21 +804,6 @@
             self.add_agent(Agent(tasks))
 
 
-    def _recover_parsing_entries(self):
-        recovered_entry_ids = set()
-        for entry in HostQueueEntry.fetch(where='status = "Parsing"'):
-            if entry.id in recovered_entry_ids:
-                continue
-            queue_entries = entry.job.get_group_entries(entry)
-            recovered_entry_ids = recovered_entry_ids.union(
-                entry.id for entry in queue_entries)
-            logging.info('Recovering parsing entries %s', 
-                         (', '.join(str(entry) for entry in queue_entries)))
-
-            reparse_task = FinalReparseTask(queue_entries)
-            self.add_agent(Agent([reparse_task], num_processes=0))
-
-
     def _recover_hosts(self):
         # recover "Repair Failed" hosts
         message = 'Reverifying dead host %s'
@@ -883,12 +889,10 @@
 
 
     def _find_aborting(self):
-        for entry in queue_entries_to_abort():
-            agents_to_abort = list(self.get_agents_for_entry(entry))
-            for agent in agents_to_abort:
-                self.remove_agent(agent)
-
-            entry.abort(self, agents_to_abort)
+        for entry in HostQueueEntry.fetch(where='aborted and not complete'):
+            for agent in self.get_agents_for_entry(entry):
+                agent.abort()
+            entry.abort(self)
 
 
     def _can_start_agent(self, agent, num_started_this_cycle,
@@ -971,13 +975,15 @@
             command = ['nice', '-n', str(nice_level)] + command
         self._set_start_time()
         self.pidfile_id = _drone_manager.execute_command(
-            command, working_directory, log_file=log_file,
-            pidfile_name=pidfile_name, paired_with_pidfile=paired_with_pidfile)
+            command, working_directory, pidfile_name=pidfile_name,
+            log_file=log_file, paired_with_pidfile=paired_with_pidfile)
 
 
-    def attach_to_existing_process(self, execution_tag):
+    def attach_to_existing_process(self, execution_tag,
+                                   pidfile_name=_AUTOSERV_PID_FILE):
         self._set_start_time()
-        self.pidfile_id = _drone_manager.get_pidfile_id_from(execution_tag)
+        self.pidfile_id = _drone_manager.get_pidfile_id_from(
+            execution_tag, pidfile_name=pidfile_name)
         _drone_manager.register_pidfile(self.pidfile_id)
 
 
@@ -1094,7 +1100,7 @@
 class Agent(object):
     def __init__(self, tasks, num_processes=1):
         self.active_task = None
-        self.queue = Queue.Queue(0)
+        self.queue = None
         self.dispatcher = None
         self.num_processes = num_processes
 
@@ -1102,10 +1108,15 @@
                                                for task in tasks)
         self.host_ids = self._union_ids(task.host_ids for task in tasks)
 
+        self._clear_queue()
         for task in tasks:
             self.add_task(task)
 
 
+    def _clear_queue(self):
+        self.queue = Queue.Queue(0)
+
+
     def _union_ids(self, id_lists):
         return set(itertools.chain(*id_lists))
 
@@ -1128,7 +1139,6 @@
         logging.info("agent picking task")
         if self.active_task:
             assert self.active_task.is_done()
-
             if not self.active_task.success:
                 self.on_task_failure()
 
@@ -1140,7 +1150,7 @@
 
 
     def on_task_failure(self):
-        self.queue = Queue.Queue(0)
+        self._clear_queue()
         # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
         # get reset.
         new_agent = Agent(self.active_task.failure_tasks)
@@ -1157,12 +1167,19 @@
 
     def start(self):
         assert self.dispatcher
-
         self._next_task()
 
 
+    def abort(self):
+        if self.active_task:
+            self.active_task.abort()
+            self.active_task = None
+            self._clear_queue()
+
+
 class AgentTask(object):
-    def __init__(self, cmd, working_directory=None, failure_tasks=[]):
+    def __init__(self, cmd, working_directory=None, failure_tasks=[],
+                 pidfile_name=None, paired_with_pidfile=None):
         self.done = False
         self.failure_tasks = failure_tasks
         self.started = False
@@ -1172,6 +1189,7 @@
         self.agent = None
         self.monitor = None
         self.success = None
+        self.aborted = False
         self.queue_entry_ids = []
         self.host_ids = []
         self.log_file = None
@@ -1246,6 +1264,7 @@
         if self.monitor:
             self.monitor.kill()
         self.done = True
+        self.aborted = True
         self.cleanup()
 
 
@@ -1277,12 +1296,14 @@
         self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
 
 
-    def run(self):
+    def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
         if self.cmd:
             self.monitor = PidfileRunMonitor()
             self.monitor.run(self.cmd, self._working_directory,
                              nice_level=AUTOSERV_NICE_LEVEL,
-                             log_file=self.log_file)
+                             log_file=self.log_file,
+                             pidfile_name=pidfile_name,
+                             paired_with_pidfile=paired_with_pidfile)
 
 
 class RepairTask(AgentTask):
@@ -1461,10 +1482,13 @@
                                            [_LOST_PROCESS_ERROR])
 
 
-    def _finish_task(self, success):
+    def _finish_task(self):
+        # both of these conditionals can be true, iff the process ran, wrote a
+        # pid to its pidfile, and then exited without writing an exit code
         if self.monitor.has_process():
             self._write_keyval_after_job("job_finished", int(time.time()))
-            self._copy_and_parse_results(self.queue_entries)
+            gather_task = GatherLogsTask(self.job, self.queue_entries)
+            self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
 
         if self.monitor.lost_process:
             self._write_lost_process_error_file()
@@ -1512,34 +1536,13 @@
     def abort(self):
         super(QueueTask, self).abort()
         self._log_abort()
-        self._finish_task(False)
-
-
-    def _reboot_hosts(self):
-        reboot_after = self.job.reboot_after
-        do_reboot = False
-        if reboot_after == models.RebootAfter.ALWAYS:
-            do_reboot = True
-        elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
-            num_tests_failed = self.monitor.num_tests_failed()
-            do_reboot = (self.success and num_tests_failed == 0)
-
-        for queue_entry in self.queue_entries:
-            if do_reboot:
-                # don't pass the queue entry to the CleanupTask. if the cleanup
-                # fails, the job doesn't care -- it's over.
-                cleanup_task = CleanupTask(host=queue_entry.get_host())
-                self.agent.dispatcher.add_agent(Agent([cleanup_task]))
-            else:
-                queue_entry.host.set_status('Ready')
+        self._finish_task()
 
 
     def epilog(self):
         super(QueueTask, self).epilog()
-        self._finish_task(self.success)
-        self._reboot_hosts()
-
-        logging.info("queue_task finished with succes=%s", self.success)
+        self._finish_task()
+        logging.info("queue_task finished with success=%s", self.success)
 
 
 class RecoveryQueueTask(QueueTask):
@@ -1557,6 +1560,149 @@
         pass
 
 
+class PostJobTask(AgentTask):
+    def __init__(self, queue_entries, pidfile_name, logfile_name,
+                 run_monitor=None):
+        """
+        If run_monitor != None, we're recovering a running task.
+        """
+        self._queue_entries = queue_entries
+        self._pidfile_name = pidfile_name
+        self._run_monitor = run_monitor
+
+        self._execution_tag = self._get_consistent_execution_tag(queue_entries)
+        self._results_dir = _drone_manager.absolute_path(self._execution_tag)
+        self._autoserv_monitor = PidfileRunMonitor()
+        self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
+        self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
+
+        if _testing_mode:
+            command = 'true'
+        else:
+            command = self._generate_command(self._results_dir)
+
+        super(PostJobTask, self).__init__(cmd=command,
+                                          working_directory=self._execution_tag)
+
+        self.log_file = os.path.join(self._execution_tag, logfile_name)
+        self._final_status = self._determine_final_status()
+
+
+    def _generate_command(self, results_dir):
+        raise NotImplementedError('Subclasses must override this')
+
+
+    def _job_was_aborted(self):
+        was_aborted = None
+        for queue_entry in self._queue_entries:
+            queue_entry.update_from_database()
+            if was_aborted is None: # first queue entry
+                was_aborted = bool(queue_entry.aborted)
+            elif was_aborted != bool(queue_entry.aborted): # subsequent entries
+                email_manager.manager.enqueue_notify_email(
+                    'Inconsistent abort state',
+                    'Queue entries have inconsistent abort state: ' +
+                    ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
+                # don't crash here, just assume true
+                return True
+        return was_aborted
+
+
+    def _determine_final_status(self):
+        if self._job_was_aborted():
+            return models.HostQueueEntry.Status.ABORTED
+
+        # we'll use a PidfileRunMonitor to read the autoserv exit status
+        if self._autoserv_monitor.exit_code() == 0:
+            return models.HostQueueEntry.Status.COMPLETED
+        return models.HostQueueEntry.Status.FAILED
+
+
+    def run(self):
+        if self._run_monitor is not None:
+            self.monitor = self._run_monitor
+        else:
+            # make sure we actually have results to work with.
+            # this should never happen in normal operation.
+            if not self._autoserv_monitor.has_process():
+                email_manager.manager.enqueue_notify_email(
+                    'No results in post-job task',
+                    'No results in post-job task at %s' %
+                    self._autoserv_monitor.pidfile_id)
+                self.finished(False)
+                return
+
+            super(PostJobTask, self).run(
+                pidfile_name=self._pidfile_name,
+                paired_with_pidfile=self._paired_with_pidfile)
+
+
+    def _set_all_statuses(self, status):
+        for queue_entry in self._queue_entries:
+            queue_entry.set_status(status)
+
+
+    def abort(self):
+        # override AgentTask.abort() to avoid killing the process and ending
+        # the task.  post-job tasks continue when the job is aborted.
+        pass
+
+
+class GatherLogsTask(PostJobTask):
+    """
+    Task responsible for
+    * gathering uncollected logs (if Autoserv crashed hard or was killed)
+    * copying logs to the results repository
+    * spawning CleanupTasks for hosts, if necessary
+    * spawning a FinalReparseTask for the job
+    """
+    def __init__(self, job, queue_entries, run_monitor=None):
+        self._job = job
+        super(GatherLogsTask, self).__init__(
+            queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
+            logfile_name='.collect_crashinfo.log', run_monitor=run_monitor)
+        self._set_ids(queue_entries=queue_entries)
+
+
+    def _generate_command(self, results_dir):
+        host_list = ','.join(queue_entry.host.hostname
+                             for queue_entry in self._queue_entries)
+        return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
+                '-r', results_dir]
+
+
+    def prolog(self):
+        super(GatherLogsTask, self).prolog()
+        self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
+
+
+    def _reboot_hosts(self):
+        reboot_after = self._job.reboot_after
+        do_reboot = False
+        if reboot_after == models.RebootAfter.ALWAYS:
+            do_reboot = True
+        elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
+            final_success = (
+                self._final_status == models.HostQueueEntry.Status.COMPLETED)
+            num_tests_failed = self._autoserv_monitor.num_tests_failed()
+            do_reboot = (final_success and num_tests_failed == 0)
+
+        for queue_entry in self._queue_entries:
+            if do_reboot:
+                # don't pass the queue entry to the CleanupTask. if the cleanup
+                # fails, the job doesn't care -- it's over.
+                cleanup_task = CleanupTask(host=queue_entry.host)
+                self.agent.dispatcher.add_agent(Agent([cleanup_task]))
+            else:
+                queue_entry.host.set_status('Ready')
+
+
+    def epilog(self):
+        super(GatherLogsTask, self).epilog()
+        self._copy_and_parse_results(self._queue_entries)
+        self._reboot_hosts()
+
+
 class CleanupTask(PreJobTask):
     def __init__(self, host=None, queue_entry=None):
         assert bool(host) ^ bool(queue_entry)
@@ -1590,59 +1736,18 @@
             self.host.update_field('dirty', 0)
 
 
-class AbortTask(AgentTask):
-    def __init__(self, queue_entry, agents_to_abort):
-        super(AbortTask, self).__init__('')
-        self.queue_entry = queue_entry
-        # don't use _set_ids, since we don't want to set the host_ids
-        self.queue_entry_ids = [queue_entry.id]
-        self.agents_to_abort = agents_to_abort
-
-
-    def prolog(self):
-        logging.info("starting abort on host %s, job %s",
-                     self.queue_entry.host_id, self.queue_entry.job_id)
-
-
-    def epilog(self):
-        super(AbortTask, self).epilog()
-        self.queue_entry.set_status('Aborted')
-        self.success = True
-
-
-    def run(self):
-        for agent in self.agents_to_abort:
-            if (agent.active_task):
-                agent.active_task.abort()
-
-
-class FinalReparseTask(AgentTask):
+class FinalReparseTask(PostJobTask):
     _num_running_parses = 0
 
-    def __init__(self, queue_entries):
-        self._queue_entries = queue_entries
+    def __init__(self, queue_entries, run_monitor=None):
+        super(FinalReparseTask, self).__init__(queue_entries,
+                                               pidfile_name=_PARSER_PID_FILE,
+                                               logfile_name='.parse.log',
+                                               run_monitor=run_monitor)
         # don't use _set_ids, since we don't want to set the host_ids
         self.queue_entry_ids = [entry.id for entry in queue_entries]
         self._parse_started = False
 
-        assert len(queue_entries) > 0
-        queue_entry = queue_entries[0]
-
-        self._execution_tag = queue_entry.execution_tag()
-        self._results_dir = _drone_manager.absolute_path(self._execution_tag)
-        self._autoserv_monitor = PidfileRunMonitor()
-        self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
-        self._final_status = self._determine_final_status()
-
-        if _testing_mode:
-            self.cmd = 'true'
-        else:
-            super(FinalReparseTask, self).__init__(
-                cmd=self._generate_parse_command(),
-                working_directory=self._execution_tag)
-
-        self.log_file = os.path.join(self._execution_tag, '.parse.log')
-
 
     @classmethod
     def _increment_running_parses(cls):
@@ -1660,28 +1765,19 @@
                 scheduler_config.config.max_parse_processes)
 
 
-    def _determine_final_status(self):
-        # we'll use a PidfileRunMonitor to read the autoserv exit status
-        if self._autoserv_monitor.exit_code() == 0:
-            return models.HostQueueEntry.Status.COMPLETED
-        return models.HostQueueEntry.Status.FAILED
-
-
     def prolog(self):
         super(FinalReparseTask, self).prolog()
-        for queue_entry in self._queue_entries:
-            queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
+        self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
 
 
     def epilog(self):
         super(FinalReparseTask, self).epilog()
-        for queue_entry in self._queue_entries:
-            queue_entry.set_status(self._final_status)
+        self._set_all_statuses(self._final_status)
 
 
-    def _generate_parse_command(self):
+    def _generate_command(self, results_dir):
         return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
-                self._results_dir]
+                results_dir]
 
 
     def poll(self):
@@ -1702,21 +1798,8 @@
         if not self._can_run_new_parse():
             return
 
-        # make sure we actually have results to parse
-        # this should never happen in normal operation
-        if not self._autoserv_monitor.has_process():
-            email_manager.manager.enqueue_notify_email(
-                'No results to parse',
-                'No results to parse at %s' % self._autoserv_monitor.pidfile_id)
-            self.finished(False)
-            return
-
         # actually run the parse command
-        self.monitor = PidfileRunMonitor()
-        self.monitor.run(self.cmd, self._working_directory,
-                         log_file=self.log_file,
-                         pidfile_name='.parser_execute',
-                         paired_with_pidfile=self._autoserv_monitor.pidfile_id)
+        super(FinalReparseTask, self).run()
 
         self._increment_running_parses()
         self._parse_started = True
@@ -1881,15 +1964,13 @@
         return int(rows[0][0])
 
 
-    def update_field(self, field, value, condition=''):
+    def update_field(self, field, value):
         assert field in self._valid_fields
 
         if getattr(self, field) == value:
             return
 
         query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
-        if condition:
-            query += ' AND (%s)' % condition
         _db.execute(query, (value, self.id))
 
         setattr(self, field, value)
@@ -2027,7 +2108,7 @@
     _table_name = 'host_queue_entries'
     _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
                'active', 'complete', 'deleted', 'execution_subdir',
-               'atomic_group_id')
+               'atomic_group_id', 'aborted')
 
 
     def __init__(self, id=None, row=None, **kwargs):
@@ -2120,13 +2201,7 @@
 
 
     def set_status(self, status):
-        abort_statuses = ['Abort', 'Aborting', 'Aborted']
-        if status not in abort_statuses:
-            condition = ' AND '.join(['status <> "%s"' % x
-                                      for x in abort_statuses])
-        else:
-            condition = ''
-        self.update_field('status', status, condition=condition)
+        self.update_field('status', status)
 
         logging.info("%s -> %s", self, self.status)
 
@@ -2135,7 +2210,7 @@
             self.update_field('active', False)
 
         if status in ['Pending', 'Running', 'Verifying', 'Starting',
-                      'Aborting']:
+                      'Gathering']:
             self.update_field('complete', False)
             self.update_field('active', True)
 
@@ -2261,14 +2336,24 @@
         return None
 
 
-    def abort(self, dispatcher, agents_to_abort=[]):
-        host = self.get_host()
-        if self.active and host:
-            dispatcher.add_agent(Agent(tasks=host.reverify_tasks()))
+    def abort(self, dispatcher):
+        assert self.aborted and not self.complete
 
-        abort_task = AbortTask(self, agents_to_abort)
-        self.set_status('Aborting')
-        dispatcher.add_agent(Agent(tasks=[abort_task], num_processes=0))
+        Status = models.HostQueueEntry.Status
+        has_running_job_agent = (
+            self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
+            and dispatcher.get_agents_for_entry(self))
+        if has_running_job_agent:
+            # do nothing; post-job tasks will finish and then mark this entry
+            # with status "Aborted" and take care of the host
+            return
+
+        if self.status in (Status.STARTING, Status.PENDING):
+            self.host.set_status(models.Host.Status.READY)
+        elif self.status == Status.VERIFYING:
+            dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
+
+        self.set_status(Status.ABORTED)
 
     def execution_tag(self):
         assert self.execution_subdir
diff --git a/scheduler/monitor_db_unittest.py b/scheduler/monitor_db_unittest.py
index 0bb9fbd..80436b8 100644
--- a/scheduler/monitor_db_unittest.py
+++ b/scheduler/monitor_db_unittest.py
@@ -53,6 +53,20 @@
         return 'row with id %s' % self.row_id
 
 
+class IsAgentWithTask(mock.argument_comparator):
+        def __init__(self, task):
+            self._task = task
+
+
+        def is_satisfied_by(self, parameter):
+            if not isinstance(parameter, monitor_db.Agent):
+                return False
+            tasks = list(parameter.queue.queue)
+            if len(tasks) != 1:
+                return False
+            return tasks[0] == self._task
+
+
 class BaseSchedulerTest(unittest.TestCase):
     _config_section = 'AUTOTEST_WEB'
     _test_db_initialized = False
@@ -275,7 +289,7 @@
         self.god.stub_with(monitor_db, 'Job', MockJob)
         hqe = monitor_db.HostQueueEntry(
                 new_record=True,
-                row=[0, 1, 2, 'Queued', None, 0, 0, 0, '.', None])
+                row=[0, 1, 2, 'Queued', None, 0, 0, 0, '.', None, False])
         hqe.save()
         new_id = hqe.id
         # Force a re-query and verify that the correct data was stored.
@@ -928,16 +942,6 @@
     """
     Test the dispatcher abort functionality.
     """
-    def _check_abort_agent(self, agent, entry_id):
-        self.assert_(isinstance(agent, monitor_db.Agent))
-        tasks = list(agent.queue.queue)
-        self.assertEquals(len(tasks), 1)
-        abort = tasks[0]
-
-        self.assert_(isinstance(abort, monitor_db.AbortTask))
-        self.assertEquals(abort.queue_entry.id, entry_id)
-
-
     def _check_host_agent(self, agent, host_id):
         self.assert_(isinstance(agent, monitor_db.Agent))
         tasks = list(agent.queue.queue)
@@ -951,47 +955,39 @@
         self.assertEquals(verify.host.id, host_id)
 
 
-    def _check_agents(self, agents, include_host_tasks):
+    def _check_agents(self, agents):
         agents = list(agents)
-        if include_host_tasks:
-            self.assertEquals(len(agents), 4)
-            self._check_host_agent(agents.pop(0), 1)
-            self._check_host_agent(agents.pop(1), 2)
-
-        self.assertEquals(len(agents), 2)
-        self._check_abort_agent(agents[0], 1)
-        self._check_abort_agent(agents[1], 2)
+        self.assertEquals(len(agents), 3)
+        self.assertEquals(agents[0], self._agent)
+        self._check_host_agent(agents[1], 1)
+        self._check_host_agent(agents[2], 2)
 
 
-    def test_find_aborting_inactive(self):
+    def _common_setup(self):
         self._create_job(hosts=[1, 2])
-        self._update_hqe(set='status="Abort"')
+        self._update_hqe(set='aborted=1')
+        self._agent = self.god.create_mock_class(monitor_db.Agent, 'old_agent')
+        self._agent.host_ids = self._agent.queue_entry_ids = [1, 2]
+        self._agent.abort.expect_call()
+        self._agent.abort.expect_call() # gets called once for each HQE
+        self._dispatcher.add_agent(self._agent)
 
+
+    def test_find_aborting(self):
+        self._common_setup()
         self._dispatcher._find_aborting()
-
-        self._check_agents(self._dispatcher._agents, include_host_tasks=False)
         self.god.check_playback()
 
 
-    def test_find_aborting_active(self):
-        self._create_job(hosts=[1, 2])
-        self._update_hqe(set='status="Abort", active=1')
-        # have to make an Agent for the active HQEs
-        agent = self.god.create_mock_class(monitor_db.Agent, 'old_agent')
-        agent.host_ids = agent.queue_entry_ids = [1, 2]
-        self._dispatcher.add_agent(agent)
+    def test_find_aborting_verifying(self):
+        self._common_setup()
+        self._update_hqe(set='active=1, status="Verifying"')
 
         self._dispatcher._find_aborting()
 
-        self._check_agents(self._dispatcher._agents, include_host_tasks=True)
+        self._check_agents(self._dispatcher._agents)
         self.god.check_playback()
 
-        # ensure agent gets aborted
-        abort1 = self._dispatcher._agents[1].queue.queue[0]
-        self.assertEquals(abort1.agents_to_abort, [agent])
-        abort2 = self._dispatcher._agents[3].queue.queue[0]
-        self.assertEquals(abort2.agents_to_abort, [])
-
 
 class JobTimeoutTest(BaseSchedulerTest):
     def _test_synch_start_timeout_helper(self, expect_abort,
@@ -1021,10 +1017,7 @@
         cleanup._abort_jobs_past_synch_start_timeout()
 
         for hqe in job.hostqueueentry_set.all():
-            if expect_abort:
-                self.assert_(hqe.status in ('Abort', 'Aborted'), hqe.status)
-            else:
-                self.assert_(hqe.status not in ('Abort', 'Aborted'), hqe.status)
+            self.assertEquals(hqe.aborted, expect_abort)
 
 
     def test_synch_start_timeout_helper(self):
@@ -1052,8 +1045,10 @@
 
         self.pidfile_id = object()
 
-        self.mock_drone_manager.get_pidfile_id_from.expect_call(
-            self.execution_tag).and_return(self.pidfile_id)
+        (self.mock_drone_manager.get_pidfile_id_from
+             .expect_call(self.execution_tag,
+                          pidfile_name=monitor_db._AUTOSERV_PID_FILE)
+             .and_return(self.pidfile_id))
         self.mock_drone_manager.register_pidfile.expect_call(self.pidfile_id)
 
         self.monitor = monitor_db.PidfileRunMonitor()
@@ -1223,26 +1218,13 @@
         return task
 
 
-    class IsAgentWithTaskComparator(mock.argument_comparator):
-        def __init__(self, task):
-            self._task = task
-
-
-        def is_satisfied_by(self, parameter):
-            if not isinstance(parameter, monitor_db.Agent):
-                return False
-            tasks = list(parameter.queue.queue)
-            if len(tasks) != 1:
-                return False
-            return tasks[0] == self._task
-
-
     def test_agent(self):
         task1 = self._create_mock_task('task1')
         task2 = self._create_mock_task('task2')
         task3 = self._create_mock_task('task3')
-        dispatcher = self.god.create_mock_class(monitor_db.Dispatcher,
-                                                'dispatcher')
+        self._dispatcher = self.god.create_mock_class(monitor_db.Dispatcher,
+                                                      'dispatcher')
+
 
         task1.start.expect_call()
         task1.is_done.expect_call().and_return(False)
@@ -1257,10 +1239,10 @@
         task2.success = False
         task2.failure_tasks = [task3]
 
-        dispatcher.add_agent.expect_call(self.IsAgentWithTaskComparator(task3))
+        self._dispatcher.add_agent.expect_call(IsAgentWithTask(task3))
 
         agent = monitor_db.Agent([task1, task2])
-        agent.dispatcher = dispatcher
+        agent.dispatcher = self._dispatcher
         agent.start()
         while not agent.is_done():
             agent.tick()
@@ -1311,6 +1293,9 @@
         self.queue_entry.job = self.job
         self.queue_entry.host = self.host
         self.queue_entry.meta_host = None
+        self._dispatcher = self.god.create_mock_class(monitor_db.Dispatcher,
+                                                      'dispatcher')
+
 
 
     def tearDown(self):
@@ -1343,7 +1328,9 @@
             mock.is_instance_comparator(list),
             'tempdir',
             nice_level=monitor_db.AUTOSERV_NICE_LEVEL,
-            log_file=mock.anything_comparator())
+            log_file=mock.anything_comparator(),
+            pidfile_name=monitor_db._AUTOSERV_PID_FILE,
+            paired_with_pidfile=None)
         monitor_db.PidfileRunMonitor.exit_code.expect_call()
         monitor_db.PidfileRunMonitor.exit_code.expect_call().and_return(
             exit_status)
@@ -1403,10 +1390,8 @@
     def test_repair_task_with_queue_entry(self):
         self.god.stub_class(monitor_db, 'FinalReparseTask')
         self.god.stub_class(monitor_db, 'Agent')
-        dispatcher = self.god.create_mock_class(monitor_db.Dispatcher,
-                                                'dispatcher')
         agent = DummyAgent()
-        agent.dispatcher = dispatcher
+        agent.dispatcher = self._dispatcher
 
         self.host.set_status.expect_call('Repairing')
         self.queue_entry.requeue.expect_call()
@@ -1421,7 +1406,7 @@
             [self.queue_entry])
         reparse_agent = monitor_db.Agent.expect_new([reparse_task],
                                                     num_processes=0)
-        dispatcher.add_agent.expect_call(reparse_agent)
+        self._dispatcher.add_agent.expect_call(reparse_agent)
         self.queue_entry.handle_host_failure.expect_call()
 
         task = monitor_db.RepairTask(self.host, self.queue_entry)
@@ -1492,24 +1477,7 @@
         self.test_verify_task_with_queue_entry()
 
 
-    def test_abort_task(self):
-        queue_entry = self.god.create_mock_class(monitor_db.HostQueueEntry,
-                                                 'queue_entry')
-        queue_entry.id = 1
-        queue_entry.host_id, queue_entry.job_id = 1, 2
-        task = self.god.create_mock_class(monitor_db.AgentTask, 'task')
-        agent = self.god.create_mock_class(monitor_db.Agent, 'agent')
-        agent.active_task = task
-
-        task.abort.expect_call()
-        queue_entry.set_status.expect_call('Aborted')
-
-        abort_task = monitor_db.AbortTask(queue_entry, [agent])
-        self.run_task(abort_task, True)
-        self.god.check_playback()
-
-
-    def _setup_pre_parse_expects(self, autoserv_success):
+    def _setup_post_job_task_expects(self, autoserv_success, hqe_status):
         self.queue_entry.execution_tag.expect_call().and_return('tag')
         self.pidfile_monitor = monitor_db.PidfileRunMonitor.expect_new()
         self.pidfile_monitor.pidfile_id = self.PIDFILE_ID
@@ -1518,9 +1486,15 @@
             code = 0
         else:
             code = 1
+        self.queue_entry.update_from_database.expect_call()
+        self.queue_entry.aborted = False
         self.pidfile_monitor.exit_code.expect_call().and_return(code)
 
-        self.queue_entry.set_status.expect_call('Parsing')
+        self.queue_entry.set_status.expect_call(hqe_status)
+
+
+    def _setup_pre_parse_expects(self, autoserv_success):
+        self._setup_post_job_task_expects(autoserv_success, 'Parsing')
 
 
     def _setup_post_parse_expects(self, autoserv_success):
@@ -1531,26 +1505,31 @@
         self.queue_entry.set_status.expect_call(status)
 
 
-    def setup_reparse_run_monitor(self):
+    def _setup_post_job_run_monitor(self, pidfile_name):
         self.pidfile_monitor.has_process.expect_call().and_return(True)
         autoserv_pidfile_id = object()
-        monitor = monitor_db.PidfileRunMonitor.expect_new()
-        monitor.run.expect_call(
+        self.monitor = monitor_db.PidfileRunMonitor.expect_new()
+        self.monitor.run.expect_call(
             mock.is_instance_comparator(list),
             'tag',
+            nice_level=monitor_db.AUTOSERV_NICE_LEVEL,
             log_file=mock.anything_comparator(),
-            pidfile_name='.parser_execute',
+            pidfile_name=pidfile_name,
             paired_with_pidfile=self.PIDFILE_ID)
-        monitor.exit_code.expect_call()
-        monitor.exit_code.expect_call().and_return(0)
-        monitor.get_process.expect_call().and_return(self.DUMMY_PROCESS)
+        self.monitor.exit_code.expect_call()
+        self.monitor.exit_code.expect_call().and_return(0)
+        self._expect_copy_results()
+
+
+    def _expect_copy_results(self):
+        self.monitor.get_process.expect_call().and_return(self.DUMMY_PROCESS)
         drone_manager.DroneManager.copy_to_results_repository.expect_call(
                 self.DUMMY_PROCESS, mock.is_string_comparator())
 
 
     def _test_final_reparse_task_helper(self, autoserv_success=True):
         self._setup_pre_parse_expects(autoserv_success)
-        self.setup_reparse_run_monitor()
+        self._setup_post_job_run_monitor(monitor_db._PARSER_PID_FILE)
         self._setup_post_parse_expects(autoserv_success)
 
         task = monitor_db.FinalReparseTask([self.queue_entry])
@@ -1578,7 +1557,7 @@
             False)
         monitor_db.FinalReparseTask._can_run_new_parse.expect_call().and_return(
             True)
-        self.setup_reparse_run_monitor()
+        self._setup_post_job_run_monitor(monitor_db._PARSER_PID_FILE)
         self._setup_post_parse_expects(True)
 
         task = monitor_db.FinalReparseTask([self.queue_entry])
@@ -1586,6 +1565,29 @@
         self.god.check_playback()
 
 
+    def test_gather_logs_task(self):
+        self.god.stub_class(monitor_db, 'PidfileRunMonitor')
+        self.god.stub_class(monitor_db, 'FinalReparseTask')
+        self._setup_post_job_task_expects(True, 'Gathering')
+        self._setup_post_job_run_monitor('.collect_crashinfo_execute')
+        self.queue_entry.execution_tag.expect_call().and_return('tag')
+        self._expect_copy_results()
+        parse_task = monitor_db.FinalReparseTask.expect_new([self.queue_entry])
+        parse_task.host_ids = parse_task.queue_entry_ids = []
+        self._dispatcher.add_agent.expect_call(IsAgentWithTask(parse_task))
+
+        # TODO(showard): add tests for rebooting code
+        self.job.reboot_after = models.RebootAfter.NEVER
+        self.host.set_status.expect_call('Ready')
+
+        task = monitor_db.GatherLogsTask(self.job, [self.queue_entry])
+        task.agent = DummyAgent()
+        task.agent.dispatcher = self._dispatcher
+        self.run_task(task, True)
+
+        self.god.check_playback()
+
+
     def _test_cleanup_task_helper(self, success, use_queue_entry=False):
         if use_queue_entry:
             self.queue_entry.get_host.expect_call().and_return(self.host)
diff --git a/server/autoserv b/server/autoserv
index 81cd8dd..b8c69e5 100755
--- a/server/autoserv
+++ b/server/autoserv
@@ -189,7 +189,11 @@
 
 
     if parser.options.write_pidfile:
-        pid_file_manager = pidfile.PidFileManager("autoserv", results)
+        if parser.options.collect_crashinfo:
+            pidfile_label = 'collect_crashinfo'
+        else:
+            pidfile_label = 'autoserv'
+        pid_file_manager = pidfile.PidFileManager(pidfile_label, results)
         pid_file_manager.open_file()
     else:
         pid_file_manager = None