make SpecialTasks recoverable.  this involves quite a few changes.
* run tasks in determined dirs instead of temp dirs.  the dir paths look like hosts//-, for example, hosts/myhost/4-verify.  the ID comes from the SpecialTask DB row.  this allows us to find the pidfile when we go looking for it during recovery, and it makes it simple to find the logs for any given special task, much like for HostQueueEntries. added SpecialTask.execution_path() for this purpose, and added models_test to test it.
* added execution_path() to HostQueueEntry to match the interface of SpecialTask, allowing for more polymorphism, and changed most call sites to use it.
* since we're running in these dirs, copy the full results back in these dirs, instead of just copying a single log file.
* move process recovery code up into AgentTask, so that all AgentTasks can share the same generic process recovery code.
* change SpecialTask recovery code to do process recovery.
* change VerifyTask handling of multiple pending verify requests for a machine.  instead of updating all the requests, just delete all other tasks.  they're not specially tracked in any way so it's simplest to just delete them.
* made special tasks get marked is_active=False when they complete, to be consistent with HQEs

other changes:
* added null=True to SpecialTask.time_started definition
* made EmailManager.enqueue_notify_email always log the message, and removed explicit logging calls from call sites
* added feature to DroneManager.execute_command() to automatically substitute the working directory into the command.  this avoids some duplicate information being passed around and simplifies the unit test.

Signed-off-by: Steve Howard <showard@google.com>


git-svn-id: http://test.kernel.org/svn/autotest/trunk@3380 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index 2e4c4b0..eae30fb 100755
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -211,14 +211,13 @@
     logging.info("Connected! Running...")
 
 
-def _autoserv_command_line(machines, results_dir, extra_args, job=None,
-                           queue_entry=None, verbose=True):
+def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
+                           verbose=True):
     """
     @returns The autoserv command line as a list of executable + parameters.
 
     @param machines - string - A machine or comma separated list of machines
             for the (-m) flag.
-    @param results_dir - string - Where the results will be written (-r).
     @param extra_args - list - Additional arguments to pass to autoserv.
     @param job - Job object - If supplied, -u owner and -l name parameters
             will be added.
@@ -226,7 +225,7 @@
             object was supplied, this will be used to lookup the Job object.
     """
     autoserv_argv = [_autoserv_path, '-p', '-m', machines,
-                     '-r', _drone_manager.absolute_path(results_dir)]
+                     '-r', drone_manager.WORKING_DIRECTORY]
     if job or queue_entry:
         if not job:
             job = queue_entry.job
@@ -704,13 +703,24 @@
         # parsing entries
         queue_entries = HostQueueEntry.fetch(
             where="status IN ('Running', 'Gathering', 'Parsing')")
-        for queue_entry in queue_entries:
+        special_tasks = models.SpecialTask.objects.filter(is_active=True)
+        for execution_entry in itertools.chain(queue_entries, special_tasks):
             for pidfile_name in _ALL_PIDFILE_NAMES:
                 pidfile_id = _drone_manager.get_pidfile_id_from(
-                    queue_entry.execution_tag(), pidfile_name=pidfile_name)
+                    execution_entry.execution_path(), pidfile_name=pidfile_name)
                 _drone_manager.register_pidfile(pidfile_id)
 
 
+    def _get_recovery_run_monitor(self, execution_path, pidfile_name, orphans):
+        run_monitor = PidfileRunMonitor()
+        run_monitor.attach_to_existing_process(execution_path,
+                                               pidfile_name=pidfile_name)
+        if run_monitor.has_process():
+            orphans.discard(run_monitor.get_process())
+            return run_monitor, '(process %s)' % run_monitor.get_process()
+        return None, 'without process'
+
+
     def _recover_entries_with_status(self, status, orphans, pidfile_name,
                                      recover_entries_fn):
         queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
@@ -719,24 +729,13 @@
                 # synchronous job we've already recovered
                 continue
             queue_entries = queue_entry.job.get_group_entries(queue_entry)
-            execution_tag = queue_entry.execution_tag()
-            run_monitor = PidfileRunMonitor()
-            run_monitor.attach_to_existing_process(execution_tag,
-                                                   pidfile_name=pidfile_name)
+            run_monitor, process_string = self._get_recovery_run_monitor(
+                    queue_entry.execution_path(), pidfile_name, orphans)
 
-            log_message = ('Recovering %s entry %s ' %
-                           (status.lower(),
-                            ', '.join(str(entry) for entry in queue_entries)))
-            if not run_monitor.has_process():
-                # execution apparently never happened
-                logging.info(log_message + 'without process')
-                recover_entries_fn(queue_entry.job, queue_entries, None)
-                continue
-
-            logging.info(log_message + '(process %s)',
-                         run_monitor.get_process())
+            logging.info('Recovering %s entry %s %s',status.lower(),
+                         ', '.join(str(entry) for entry in queue_entries),
+                         process_string)
             recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
-            orphans.discard(run_monitor.get_process())
 
 
     def _kill_remaining_orphan_processes(self, orphans):
@@ -748,22 +747,21 @@
     def _recover_running_entries(self, orphans):
         def recover_entries(job, queue_entries, run_monitor):
             if run_monitor is not None:
-                queue_task = RecoveryQueueTask(job=job,
-                                               queue_entries=queue_entries,
-                                               run_monitor=run_monitor)
+                queue_task = QueueTask(job=job, queue_entries=queue_entries,
+                                       recover_run_monitor=run_monitor)
                 self.add_agent(Agent(tasks=[queue_task],
                                      num_processes=len(queue_entries)))
             # else, _requeue_other_active_entries will cover this
 
         self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
-                                          orphans, '.autoserv_execute',
+                                          orphans, _AUTOSERV_PID_FILE,
                                           recover_entries)
 
 
     def _recover_gathering_entries(self, orphans):
         def recover_entries(job, queue_entries, run_monitor):
             gather_task = GatherLogsTask(job, queue_entries,
-                                         run_monitor=run_monitor)
+                                         recover_run_monitor=run_monitor)
             self.add_agent(Agent([gather_task]))
 
         self._recover_entries_with_status(
@@ -774,7 +772,7 @@
     def _recover_parsing_entries(self, orphans):
         def recover_entries(job, queue_entries, run_monitor):
             reparse_task = FinalReparseTask(queue_entries,
-                                            run_monitor=run_monitor)
+                                            recover_run_monitor=run_monitor)
             self.add_agent(Agent([reparse_task], num_processes=0))
 
         self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
@@ -787,11 +785,11 @@
         self._recover_running_entries(orphans)
         self._recover_gathering_entries(orphans)
         self._recover_parsing_entries(orphans)
-        self._recover_special_tasks()
+        self._recover_special_tasks(orphans)
         self._kill_remaining_orphan_processes(orphans)
 
 
-    def _recover_special_tasks(self):
+    def _recover_special_tasks(self, orphans):
         """\
         Recovers all special tasks that have started running but have not
         completed.
@@ -801,31 +799,33 @@
                                                   is_complete=False)
         # Use ordering to force NULL queue_entry_id's to the end of the list
         for task in tasks.order_by('-queue_entry_id'):
-            if self.host_has_agent(task.host):
-                # Duplicated verify task that we've already recovered
-                continue
-
-            logging.info("Recovering %s", task)
+            assert not self.host_has_agent(task.host)
 
             host = Host(id=task.host.id)
             queue_entry = None
             if task.queue_entry:
-                queue_entry = HostQueueEntry.fetch(
-                    where='id = %s', params=(task.queue_entry.id,)).next()
+                queue_entry = HostQueueEntry(id=task.queue_entry.id)
 
-            self._recover_special_task(task, host, queue_entry)
+            run_monitor, process_string = self._get_recovery_run_monitor(
+                    task.execution_path(), _AUTOSERV_PID_FILE, orphans)
+
+            logging.info('Recovering %s %s', task, process_string)
+            self._recover_special_task(task, host, queue_entry, run_monitor)
 
 
-    def _recover_special_task(self, task, host, queue_entry):
+    def _recover_special_task(self, task, host, queue_entry, run_monitor):
         """\
         Recovers a single special task.
         """
         if task.task == models.SpecialTask.Task.VERIFY:
-            agent_tasks = self._recover_verify(task, host, queue_entry)
+            agent_tasks = self._recover_verify(task, host, queue_entry,
+                                               run_monitor)
         elif task.task == models.SpecialTask.Task.REPAIR:
-            agent_tasks = self._recover_repair(task, host, queue_entry)
+            agent_tasks = self._recover_repair(task, host, queue_entry,
+                                               run_monitor)
         elif task.task == models.SpecialTask.Task.CLEANUP:
-            agent_tasks = self._recover_cleanup(task, host, queue_entry)
+            agent_tasks = self._recover_cleanup(task, host, queue_entry,
+                                                run_monitor)
         else:
             # Should never happen
             logging.error(
@@ -834,7 +834,7 @@
         self.add_agent(Agent(agent_tasks))
 
 
-    def _recover_verify(self, task, host, queue_entry):
+    def _recover_verify(self, task, host, queue_entry, run_monitor):
         """\
         Recovers a verify task.
         No associated queue entry: Verify host
@@ -842,21 +842,24 @@
                                      entry
         """
         if not task.queue_entry:
-            return [VerifyTask(host=host, task=task)]
+            return [VerifyTask(host=host, task=task,
+                               recover_run_monitor=run_monitor)]
         else:
-            return [VerifyTask(queue_entry=queue_entry, task=task),
+            return [VerifyTask(queue_entry=queue_entry, task=task,
+                               recover_run_monitor=run_monitor),
                     SetEntryPendingTask(queue_entry=queue_entry)]
 
 
-    def _recover_repair(self, task, host, queue_entry):
+    def _recover_repair(self, task, host, queue_entry, run_monitor):
         """\
         Recovers a repair task.
         Always repair host
         """
-        return [RepairTask(host=host, queue_entry=queue_entry, task=task)]
+        return [RepairTask(host=host, queue_entry=queue_entry, task=task,
+                           recover_run_monitor=run_monitor)]
 
 
-    def _recover_cleanup(self, task, host, queue_entry):
+    def _recover_cleanup(self, task, host, queue_entry, run_monitor):
         """\
         Recovers a cleanup task.
         No associated queue entry: Clean host
@@ -864,10 +867,12 @@
                                      run associated queue entry
         """
         if not task.queue_entry:
-            return [CleanupTask(host=host, task=task)]
+            return [CleanupTask(host=host, task=task,
+                                recover_run_monitor=run_monitor)]
         else:
             agent_tasks = [CleanupTask(queue_entry=queue_entry,
-                                       task=task)]
+                                       task=task,
+                                       recover_run_monitor=run_monitor)]
             if queue_entry.job.should_run_verify(queue_entry):
                 agent_tasks.append(VerifyTask(queue_entry=queue_entry))
             agent_tasks.append(
@@ -903,20 +908,13 @@
 
 
     def _reverify_remaining_hosts(self):
-        # reverify hosts that were in the middle of verify, repair or cleanup
-        self._reverify_hosts_where("""(status = 'Repairing' OR
-                                       status = 'Verifying' OR
-                                       status = 'Cleaning')""")
-
-        # recover "Running" hosts with no active queue entries, although this
+        # recover active hosts that have not yet been recovered, although this
         # should never happen
-        message = ('Recovering running host %s - this probably indicates a '
+        message = ('Recovering active host %s - this probably indicates a '
                    'scheduler bug')
-        self._reverify_hosts_where("""status = 'Running' AND
-                                      id NOT IN (SELECT host_id
-                                                 FROM host_queue_entries
-                                                 WHERE active)""",
-                                   print_message=message)
+        self._reverify_hosts_where(
+                "status IN ('Repairing', 'Verifying', 'Cleaning', 'Running')",
+                print_message=message)
 
 
     def _reverify_hosts_where(self, where,
@@ -1149,11 +1147,11 @@
             log_file=log_file, paired_with_pidfile=paired_with_pidfile)
 
 
-    def attach_to_existing_process(self, execution_tag,
+    def attach_to_existing_process(self, execution_path,
                                    pidfile_name=_AUTOSERV_PID_FILE):
         self._set_start_time()
         self.pidfile_id = _drone_manager.get_pidfile_id_from(
-            execution_tag, pidfile_name=pidfile_name)
+            execution_path, pidfile_name=pidfile_name)
         _drone_manager.register_pidfile(self.pidfile_id)
 
 
@@ -1187,7 +1185,6 @@
     def _handle_pidfile_error(self, error, message=''):
         message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
             self._state.process, self.pidfile_id, message)
-        logging.info(message)
         email_manager.manager.enqueue_notify_email(error, message)
         self.on_lost_process(self._state.process)
 
@@ -1234,7 +1231,6 @@
         Called when no pidfile is found or no pid is in the pidfile.
         """
         message = 'No pid found at %s' % self.pidfile_id
-        logging.info(message)
         if time.time() - self._start_time > PIDFILE_TIMEOUT:
             email_manager.manager.enqueue_notify_email(
                 'Process has failed to write pidfile', message)
@@ -1436,16 +1432,16 @@
 
 
 class AgentTask(object):
-    def __init__(self, cmd, working_directory=None, failure_tasks=[],
-                 pidfile_name=None, paired_with_pidfile=None):
+    def __init__(self, cmd=None, working_directory=None, failure_tasks=[],
+                 pidfile_name=None, paired_with_pidfile=None,
+                 recover_run_monitor=None):
         self.done = False
         self.failure_tasks = failure_tasks
-        self.started = False
         self.cmd = cmd
         self._working_directory = working_directory
-        self.task = None
         self.agent = None
-        self.monitor = None
+        self.monitor = recover_run_monitor
+        self.started = bool(recover_run_monitor)
         self.success = None
         self.aborted = False
         self.queue_entry_ids = []
@@ -1493,11 +1489,7 @@
 
 
     def prolog(self):
-        pass
-
-
-    def create_temp_resultsdir(self, suffix=''):
-        self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
+        assert not self.monitor
 
 
     def cleanup(self):
@@ -1528,30 +1520,28 @@
         self.cleanup()
 
 
-    def set_host_log_file(self, base_name, host):
-        filename = '%s.%s' % (time.time(), base_name)
-        self.log_file = os.path.join('hosts', host.hostname, filename)
+    def _get_consistent_execution_path(self, execution_entries):
+        first_execution_path = execution_entries[0].execution_path()
+        for execution_entry in execution_entries[1:]:
+            assert execution_entry.execution_path() == first_execution_path, (
+                '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
+                                        execution_entry,
+                                        first_execution_path,
+                                        execution_entries[0]))
+        return first_execution_path
 
 
-    def _get_consistent_execution_tag(self, queue_entries):
-        first_execution_tag = queue_entries[0].execution_tag()
-        for queue_entry in queue_entries[1:]:
-            assert queue_entry.execution_tag() == first_execution_tag, (
-                '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
-                                        queue_entry,
-                                        first_execution_tag,
-                                        queue_entries[0]))
-        return first_execution_tag
-
-
-    def _copy_results(self, queue_entries, use_monitor=None):
-        assert len(queue_entries) > 0
+    def _copy_results(self, execution_entries, use_monitor=None):
+        """
+        @param execution_entries: list of objects with execution_path() method
+        """
+        assert len(execution_entries) > 0
         if use_monitor is None:
             assert self.monitor
             use_monitor = self.monitor
         assert use_monitor.has_process()
-        execution_tag = self._get_consistent_execution_tag(queue_entries)
-        results_path = execution_tag + '/'
+        execution_path = self._get_consistent_execution_path(execution_entries)
+        results_path = execution_path + '/'
         _drone_manager.copy_to_results_repository(use_monitor.get_process(),
                                                   results_path)
 
@@ -1567,6 +1557,7 @@
 
 
     def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
+        assert not self.monitor
         if self.cmd:
             self.monitor = PidfileRunMonitor()
             self.monitor.run(self.cmd, self._working_directory,
@@ -1605,8 +1596,47 @@
         self._write_keyval_after_job("job_finished", int(time.time()))
 
 
-class RepairTask(AgentTask, TaskWithJobKeyvals):
-    def __init__(self, host, queue_entry=None, task=None):
+class SpecialAgentTask(AgentTask):
+    """
+    Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
+    """
+
+    TASK_TYPE = None
+    host = None
+    queue_entry = None
+
+    def __init__(self, task, extra_command_args, **kwargs):
+        assert self.host
+        assert (self.TASK_TYPE is not None,
+                'self.TASK_TYPE must be overridden')
+        self.task = task
+        self._extra_command_args = extra_command_args
+        super(SpecialAgentTask, self).__init__(**kwargs)
+
+
+    def prolog(self):
+        super(SpecialAgentTask, self).prolog()
+        self.task = models.SpecialTask.prepare(self, self.task)
+        self.cmd = _autoserv_command_line(self.host.hostname,
+                                          self._extra_command_args,
+                                          queue_entry=self.queue_entry)
+        self._working_directory = self.task.execution_path()
+        self.task.activate()
+
+
+    def epilog(self):
+        super(SpecialAgentTask, self).epilog()
+        self.task.finish()
+        if self.monitor.has_process():
+            self._copy_results([self.task])
+
+
+class RepairTask(SpecialAgentTask, TaskWithJobKeyvals):
+    TASK_TYPE = models.SpecialTask.Task.REPAIR
+
+
+    def __init__(self, host, queue_entry=None, task=None,
+                 recover_run_monitor=None):
         """\
         queue_entry: queue entry to mark failed if this repair fails.
         """
@@ -1617,32 +1647,26 @@
         self.host = host
         self.queue_entry = queue_entry
 
-        self.create_temp_resultsdir('.repair')
-        cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
-                                     ['-R', '--host-protection', protection],
-                                     queue_entry=queue_entry)
-        super(RepairTask, self).__init__(cmd, self.temp_results_dir)
+        super(RepairTask, self).__init__(
+                task, ['-R', '--host-protection', protection],
+                recover_run_monitor=recover_run_monitor)
 
         # *don't* include the queue entry in IDs -- if the queue entry is
         # aborted, we want to leave the repair task running
         self._set_ids(host=host)
 
-        self.set_host_log_file('repair', self.host)
-        self._task = task
-
 
     def prolog(self):
+        super(RepairTask, self).prolog()
         logging.info("repair_task starting")
         self.host.set_status('Repairing')
         if self.queue_entry:
             self.queue_entry.requeue()
 
-        self.task_type = models.SpecialTask.Task.REPAIR
-        self._task = models.SpecialTask.prepare(self, self._task)
 
 
     def _keyval_path(self):
-        return os.path.join(self.temp_results_dir, self._KEYVAL_FILE)
+        return os.path.join(self._working_directory, self._KEYVAL_FILE)
 
 
     def _fail_queue_entry(self):
@@ -1663,8 +1687,8 @@
         # copy results logs into the normal place for job results
         _drone_manager.copy_results_on_drone(
             self.monitor.get_process(),
-            source_path=self.temp_results_dir + '/',
-            destination_path=self.queue_entry.execution_tag() + '/')
+            source_path=self._working_directory + '/',
+            destination_path=self.queue_entry.execution_path() + '/')
 
         self._copy_results([self.queue_entry])
         if self.queue_entry.job.parse_failed_repair:
@@ -1675,8 +1699,6 @@
     def epilog(self):
         super(RepairTask, self).epilog()
 
-        self._task.finish()
-
         if self.success:
             self.host.set_status('Ready')
         else:
@@ -1685,70 +1707,61 @@
                 self._fail_queue_entry()
 
 
-class PreJobTask(AgentTask):
+class PreJobTask(SpecialAgentTask):
     def epilog(self):
         super(PreJobTask, self).epilog()
         should_copy_results = (self.queue_entry and not self.success
                                and not self.queue_entry.meta_host)
         if should_copy_results:
             self.queue_entry.set_execution_subdir()
-            destination = os.path.join(self.queue_entry.execution_tag(),
-                                       os.path.basename(self.log_file))
+            log_name = os.path.basename(self.task.execution_path())
+            source = os.path.join(self.task.execution_path(), 'debug',
+                                  'autoserv.DEBUG')
+            destination = os.path.join(self.queue_entry.execution_path(),
+                                       log_name)
             _drone_manager.copy_to_results_repository(
-                self.monitor.get_process(), self.log_file,
+                self.monitor.get_process(), source,
                 destination_path=destination)
 
 
 class VerifyTask(PreJobTask):
-    def __init__(self, queue_entry=None, host=None, task=None):
+    TASK_TYPE = models.SpecialTask.Task.VERIFY
+
+
+    def __init__(self, queue_entry=None, host=None, task=None,
+                 recover_run_monitor=None):
         assert bool(queue_entry) != bool(host)
         self.host = host or queue_entry.host
         self.queue_entry = queue_entry
 
-        self.create_temp_resultsdir('.verify')
-        cmd = _autoserv_command_line(self.host.hostname, self.temp_results_dir,
-                                     ['-v'], queue_entry=queue_entry)
         failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
-        super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
-                                         failure_tasks=failure_tasks)
+        super(VerifyTask, self).__init__(
+                task, ['-v'], failure_tasks=failure_tasks,
+                recover_run_monitor=recover_run_monitor)
 
-        self.set_host_log_file('verify', self.host)
         self._set_ids(host=host, queue_entries=[queue_entry])
-        self._task = task
 
 
     def prolog(self):
         super(VerifyTask, self).prolog()
+
         logging.info("starting verify on %s", self.host.hostname)
         if self.queue_entry:
             self.queue_entry.set_status('Verifying')
         self.host.set_status('Verifying')
 
-        self.task_type = models.SpecialTask.Task.VERIFY
-
-        # Prepare all SpecialTasks associated with this verify.
-        # Include "active" verify tasks for recovery; we want the new log file
-        # and started time to be recorded
-        self._tasks = list(models.SpecialTask.objects.filter(
+        # Delete any other queued verifies for this host.  One verify will do
+        # and there's no need to keep records of other requests.
+        queued_verifies = models.SpecialTask.objects.filter(
             host__id=self.host.id,
             task=models.SpecialTask.Task.VERIFY,
-            is_complete=False,
-            queue_entry__isnull=True))
-        task_not_included = (
-            not self._task
-            or self._task.id not in [task.id for task in self._tasks])
-        if task_not_included:
-            self._tasks.append(self._task)
-        for i in range(len(self._tasks)):
-            self._tasks[i] = models.SpecialTask.prepare(self, self._tasks[i])
+            is_active=False, is_complete=False)
+        queued_verifies = queued_verifies.exclude(id=self.task.id)
+        queued_verifies.delete()
 
 
     def epilog(self):
         super(VerifyTask, self).epilog()
-
-        for task in self._tasks:
-            task.finish()
-
         if self.success:
             self.host.set_status('Ready')
 
@@ -1775,16 +1788,19 @@
 
 
 class QueueTask(AgentTask, TaskWithJobKeyvals, CleanupHostsMixin):
-    def __init__(self, job, queue_entries, cmd, group_name=''):
+    def __init__(self, job, queue_entries, cmd=None, group_name='',
+                 recover_run_monitor=None):
         self.job = job
         self.queue_entries = queue_entries
         self.group_name = group_name
-        super(QueueTask, self).__init__(cmd, self._execution_tag())
+        super(QueueTask, self).__init__(
+                cmd, self._execution_path(),
+                recover_run_monitor=recover_run_monitor)
         self._set_ids(queue_entries=queue_entries)
 
 
     def _keyval_path(self):
-        return os.path.join(self._execution_tag(), self._KEYVAL_FILE)
+        return os.path.join(self._execution_path(), self._KEYVAL_FILE)
 
 
     def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
@@ -1792,7 +1808,7 @@
                                     for key, value in keyval_dict.iteritems())
         # always end with a newline to allow additional keyvals to be written
         keyval_contents += '\n'
-        _drone_manager.attach_file_to_execution(self._execution_tag(),
+        _drone_manager.attach_file_to_execution(self._execution_path(),
                                                 keyval_contents,
                                                 file_path=keyval_path)
 
@@ -1802,15 +1818,15 @@
 
 
     def _write_host_keyvals(self, host):
-        keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
+        keyval_path = os.path.join(self._execution_path(), 'host_keyvals',
                                    host.hostname)
         platform, all_labels = host.platform_and_labels()
         keyval_dict = dict(platform=platform, labels=','.join(all_labels))
         self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
 
 
-    def _execution_tag(self):
-        return self.queue_entries[0].execution_tag()
+    def _execution_path(self):
+        return self.queue_entries[0].execution_path()
 
 
     def prolog(self):
@@ -1831,7 +1847,7 @@
 
 
     def _write_lost_process_error_file(self):
-        error_file_path = os.path.join(self._execution_tag(), 'job_failure')
+        error_file_path = os.path.join(self._execution_path(), 'job_failure')
         _drone_manager.write_lines_to_file(error_file_path,
                                            [_LOST_PROCESS_ERROR])
 
@@ -1859,7 +1875,7 @@
 
     def _write_status_comment(self, comment):
         _drone_manager.write_lines_to_file(
-            os.path.join(self._execution_tag(), 'status.log'),
+            os.path.join(self._execution_path(), 'status.log'),
             ['INFO\t----\t----\t' + comment],
             paired_with_process=self.monitor.get_process())
 
@@ -1906,35 +1922,17 @@
         logging.info("queue_task finished with success=%s", self.success)
 
 
-class RecoveryQueueTask(QueueTask):
-    def __init__(self, job, queue_entries, run_monitor):
-        super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
-        self.monitor = run_monitor
-        self.started = True
-        # since we set started=True here, prolog() and run() shouldn't be called
-
-
-    def run(self):
-        raise NotImplemented('This should never be called')
-
-
-    def prolog(self):
-        raise NotImplemented('This should never be called')
-
-
 class PostJobTask(AgentTask):
     def __init__(self, queue_entries, pidfile_name, logfile_name,
-                 run_monitor=None):
-        """
-        If run_monitor != None, we're recovering a running task.
-        """
+                 recover_run_monitor=None):
         self._queue_entries = queue_entries
         self._pidfile_name = pidfile_name
 
-        self._execution_tag = self._get_consistent_execution_tag(queue_entries)
-        self._results_dir = _drone_manager.absolute_path(self._execution_tag)
+        self._execution_path = self._get_consistent_execution_path(
+                queue_entries)
+        self._results_dir = _drone_manager.absolute_path(self._execution_path)
         self._autoserv_monitor = PidfileRunMonitor()
-        self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
+        self._autoserv_monitor.attach_to_existing_process(self._execution_path)
         self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
 
         if _testing_mode:
@@ -1942,14 +1940,11 @@
         else:
             command = self._generate_command(self._results_dir)
 
-        super(PostJobTask, self).__init__(cmd=command,
-                                          working_directory=self._execution_tag)
-        # this must happen *after* the super call
-        self.monitor = run_monitor
-        if run_monitor:
-            self.started = True
+        super(PostJobTask, self).__init__(
+                cmd=command, working_directory=self._execution_path,
+                recover_run_monitor=recover_run_monitor)
 
-        self.log_file = os.path.join(self._execution_tag, logfile_name)
+        self.log_file = os.path.join(self._execution_path, logfile_name)
         self._final_status = self._determine_final_status()
 
 
@@ -1984,8 +1979,6 @@
 
 
     def run(self):
-        assert not self.monitor
-
         # make sure we actually have results to work with.
         # this should never happen in normal operation.
         if not self._autoserv_monitor.has_process():
@@ -2020,11 +2013,12 @@
     * spawning CleanupTasks for hosts, if necessary
     * spawning a FinalReparseTask for the job
     """
-    def __init__(self, job, queue_entries, run_monitor=None):
+    def __init__(self, job, queue_entries, recover_run_monitor=None):
         self._job = job
         super(GatherLogsTask, self).__init__(
             queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
-            logfile_name='.collect_crashinfo.log', run_monitor=run_monitor)
+            logfile_name='.collect_crashinfo.log',
+            recover_run_monitor=recover_run_monitor)
         self._set_ids(queue_entries=queue_entries)
 
 
@@ -2064,24 +2058,23 @@
 
 
 class CleanupTask(PreJobTask):
-    def __init__(self, host=None, queue_entry=None, task=None):
+    TASK_TYPE = models.SpecialTask.Task.CLEANUP
+
+
+    def __init__(self, host=None, queue_entry=None, task=None,
+                 recover_run_monitor=None):
         assert bool(host) ^ bool(queue_entry)
         if queue_entry:
             host = queue_entry.get_host()
         self.queue_entry = queue_entry
         self.host = host
 
-        self.create_temp_resultsdir('.cleanup')
-        self.cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
-                                          ['--cleanup'],
-                                          queue_entry=queue_entry)
         repair_task = RepairTask(host, queue_entry=queue_entry)
-        super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
-                                          failure_tasks=[repair_task])
+        super(CleanupTask, self).__init__(
+                task, ['--cleanup'], failure_tasks=[repair_task],
+                recover_run_monitor=recover_run_monitor)
 
         self._set_ids(host=host, queue_entries=[queue_entry])
-        self.set_host_log_file('cleanup', self.host)
-        self._task = task
 
 
     def prolog(self):
@@ -2089,15 +2082,10 @@
         logging.info("starting cleanup task for host: %s", self.host.hostname)
         self.host.set_status("Cleaning")
 
-        self.task_type = models.SpecialTask.Task.CLEANUP
-        self._task = models.SpecialTask.prepare(self, self._task)
-
 
     def epilog(self):
         super(CleanupTask, self).epilog()
 
-        self._task.finish()
-
         if self.success:
             self.host.set_status('Ready')
             self.host.update_field('dirty', 0)
@@ -2106,14 +2094,14 @@
 class FinalReparseTask(PostJobTask):
     _num_running_parses = 0
 
-    def __init__(self, queue_entries, run_monitor=None):
-        super(FinalReparseTask, self).__init__(queue_entries,
-                                               pidfile_name=_PARSER_PID_FILE,
-                                               logfile_name='.parse.log',
-                                               run_monitor=run_monitor)
+    def __init__(self, queue_entries, recover_run_monitor=None):
+        super(FinalReparseTask, self).__init__(
+                queue_entries, pidfile_name=_PARSER_PID_FILE,
+                logfile_name='.parse.log',
+                recover_run_monitor=recover_run_monitor)
         # don't use _set_ids, since we don't want to set the host_ids
         self.queue_entry_ids = [entry.id for entry in queue_entries]
-        self._parse_started = (run_monitor is not None)
+        self._parse_started = self.started
 
 
     @classmethod
@@ -2808,6 +2796,10 @@
         return "%s/%s" % (self.job.tag(), self.execution_subdir)
 
 
+    def execution_path(self):
+        return self.execution_tag()
+
+
 class Job(DBObject):
     _table_name = 'jobs'
     _fields = ('id', 'owner', 'name', 'priority', 'control_file',
@@ -2999,7 +2991,7 @@
                               for entry in queue_entries])
 
         params = _autoserv_command_line(
-            hostnames, execution_tag,
+            hostnames,
             ['-P', execution_tag, '-n',
              _drone_manager.absolute_path(control_path)],
             job=self, verbose=False)