Add a new Archiving stage to the scheduler, which runs after Parsing.  This stage is responsible for copying results to the results server in a drone setup, a task currently performed directly by the scheduler, and allows for site-specific archiving functionality, replacing the site_parse functionality.  It does this by running autoserv with a special control file (scheduler/archive_results.control.srv), which loads and runs code from the new scheduler.archive_results module.  The implementation was mostly straightfoward, as the archiving stage is fully analogous to the parser stage.  I did make a couple of refactorings:
* factored out the parser throttling code into a common superclass that the ArchiveResultsTask could share
* added some generic flags to Autoserv to duplicate special-case functionality we'd added for the --collect-crashinfo option -- namely, specifying a different pidfile name and specifying that autoserv should allow (and even expect) an existing results directory.  in the future, i think it'd be more elegant to make crashinfo collection run using a special control file (as archiving works), rather than a hard-coded command-line option.
* moved call to server_job.init_parser() out of the constructor, since this was an easy source of exceptions that wouldn't get logged.

Note I believe some of the functional test changes slipped into my previous change there, which is why that looks smaller than you'd expect.

Signed-off-by: Steve Howard <showard@google.com>

==== (deleted) //depot/google_vendor_src_branch/autotest/tko/site_parse.py ====


git-svn-id: http://test.kernel.org/svn/autotest/trunk@4070 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index d4ec92a..9e4a654 100755
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -45,9 +45,10 @@
 _AUTOSERV_PID_FILE = '.autoserv_execute'
 _CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
 _PARSER_PID_FILE = '.parser_execute'
+_ARCHIVER_PID_FILE = '.archiver_execute'
 
 _ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
-                      _PARSER_PID_FILE)
+                      _PARSER_PID_FILE, _ARCHIVER_PID_FILE)
 
 # error message to leave in results dir when an autoserv process disappears
 # mysteriously
@@ -766,7 +767,8 @@
         statuses = (models.HostQueueEntry.Status.STARTING,
                     models.HostQueueEntry.Status.RUNNING,
                     models.HostQueueEntry.Status.GATHERING,
-                    models.HostQueueEntry.Status.PARSING)
+                    models.HostQueueEntry.Status.PARSING,
+                    models.HostQueueEntry.Status.ARCHIVING)
         status_list = ','.join("'%s'" % status for status in statuses)
         queue_entries = HostQueueEntry.fetch(
                 where='status IN (%s)' % status_list)
@@ -812,16 +814,19 @@
             return GatherLogsTask(queue_entries=task_entries)
         if queue_entry.status == models.HostQueueEntry.Status.PARSING:
             return FinalReparseTask(queue_entries=task_entries)
+        if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
+            return ArchiveResultsTask(queue_entries=task_entries)
 
         raise SchedulerError('_get_agent_task_for_queue_entry got entry with '
                              'invalid status %s: %s' % (entry.status, entry))
 
 
     def _check_for_duplicate_host_entries(self, task_entries):
-        parsing_status = models.HostQueueEntry.Status.PARSING
+        non_host_statuses = (models.HostQueueEntry.Status.PARSING,
+                             models.HostQueueEntry.Status.ARCHIVING)
         for task_entry in task_entries:
             using_host = (task_entry.host is not None
-                          and task_entry.status != parsing_status)
+                          and task_entry.status not in non_host_statuses)
             if using_host:
                 self._assert_host_has_no_agent(task_entry)
 
@@ -1607,9 +1612,9 @@
             queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
 
 
-    def _copy_and_parse_results(self, queue_entries, use_monitor=None):
-        self._copy_results(queue_entries, use_monitor)
-        self._parse_results(queue_entries)
+    def _archive_results(self, queue_entries):
+        for queue_entry in queue_entries:
+            queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
 
 
     def _command_line(self):
@@ -1721,6 +1726,22 @@
                         self._working_directory()))
 
 
+    def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
+                                    allowed_host_statuses=None):
+        for entry in queue_entries:
+            if entry.status not in allowed_hqe_statuses:
+                raise SchedulerError('Queue task attempting to start '
+                                     'entry with invalid status %s: %s'
+                                     % (entry.status, entry))
+            invalid_host_status = (
+                    allowed_host_statuses is not None
+                    and entry.host.status not in allowed_host_statuses)
+            if invalid_host_status:
+                raise SchedulerError('Queue task attempting to start on queue '
+                                     'entry with invalid host status %s: %s'
+                                     % (entry.host.status, entry))
+
+
 class TaskWithJobKeyvals(object):
     """AgentTask mixin providing functionality to help with job keyval files."""
     _KEYVAL_FILE = 'keyval'
@@ -1844,17 +1865,15 @@
                 source_path=self._working_directory() + '/',
                 destination_path=self.queue_entry.execution_path() + '/')
 
-        self._copy_results([self.queue_entry])
-
-        if not self.queue_entry.job.parse_failed_repair:
-            self.queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
-            return
-
         pidfile_id = _drone_manager.get_pidfile_id_from(
                 self.queue_entry.execution_path(),
                 pidfile_name=_AUTOSERV_PID_FILE)
         _drone_manager.register_pidfile(pidfile_id)
-        self._parse_results([self.queue_entry])
+
+        if self.queue_entry.job.parse_failed_repair:
+            self._parse_results([self.queue_entry])
+        else:
+            self._archive_results([self.queue_entry])
 
 
     def cleanup(self):
@@ -1995,6 +2014,55 @@
                 self.host.set_status(models.Host.Status.READY)
 
 
+class CleanupTask(PreJobTask):
+    # note this can also run post-job, but when it does, it's running standalone
+    # against the host (not related to the job), so it's not considered a
+    # PostJobTask
+
+    TASK_TYPE = models.SpecialTask.Task.CLEANUP
+
+
+    def __init__(self, task, recover_run_monitor=None):
+        super(CleanupTask, self).__init__(task, ['--cleanup'])
+        self._set_ids(host=self.host, queue_entries=[self.queue_entry])
+
+
+    def prolog(self):
+        super(CleanupTask, self).prolog()
+        logging.info("starting cleanup task for host: %s", self.host.hostname)
+        self.host.set_status(models.Host.Status.CLEANING)
+        if self.queue_entry:
+            self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
+
+
+    def _finish_epilog(self):
+        if not self.queue_entry or not self.success:
+            return
+
+        do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
+        should_run_verify = (
+                self.queue_entry.job.run_verify
+                and self.host.protection != do_not_verify_protection)
+        if should_run_verify:
+            entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
+            models.SpecialTask.objects.create(
+                    host=models.Host.objects.get(id=self.host.id),
+                    queue_entry=entry,
+                    task=models.SpecialTask.Task.VERIFY)
+        else:
+            self.queue_entry.on_pending()
+
+
+    def epilog(self):
+        super(CleanupTask, self).epilog()
+
+        if self.success:
+            self.host.update_field('dirty', 0)
+            self.host.set_status(models.Host.Status.READY)
+
+        self._finish_epilog()
+
+
 class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
     """
     Common functionality for QueueTask and HostlessQueueTask
@@ -2110,17 +2178,12 @@
 
 
     def prolog(self):
-        for entry in self.queue_entries:
-            if entry.status not in (models.HostQueueEntry.Status.STARTING,
-                                    models.HostQueueEntry.Status.RUNNING):
-                raise SchedulerError('Queue task attempting to start '
-                                     'entry with invalid status %s: %s'
-                                     % (entry.status, entry))
-            if entry.host.status not in (models.Host.Status.PENDING,
-                                         models.Host.Status.RUNNING):
-                raise SchedulerError('Queue task attempting to start on queue '
-                                     'entry with invalid host status %s: %s'
-                                     % (entry.host.status, entry))
+        self._check_queue_entry_statuses(
+                self.queue_entries,
+                allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
+                                      models.HostQueueEntry.Status.RUNNING),
+                allowed_host_statuses=(models.Host.Status.PENDING,
+                                       models.Host.Status.RUNNING))
 
         super(QueueTask, self).prolog()
 
@@ -2141,6 +2204,30 @@
             queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
 
 
+class HostlessQueueTask(AbstractQueueTask):
+    def __init__(self, queue_entry):
+        super(HostlessQueueTask, self).__init__([queue_entry])
+        self.queue_entry_ids = [queue_entry.id]
+
+
+    def prolog(self):
+        self.queue_entries[0].update_field('execution_subdir', 'hostless')
+        super(HostlessQueueTask, self).prolog()
+
+
+    def _final_status(self):
+        if self.queue_entries[0].aborted:
+            return models.HostQueueEntry.Status.ABORTED
+        if self.monitor.exit_code() == 0:
+            return models.HostQueueEntry.Status.COMPLETED
+        return models.HostQueueEntry.Status.FAILED
+
+
+    def _finish_task(self):
+        super(HostlessQueueTask, self)._finish_task()
+        self.queue_entries[0].set_status(self._final_status())
+
+
 class PostJobTask(AgentTask):
     def __init__(self, queue_entries, log_file_name):
         super(PostJobTask, self).__init__(log_file_name=log_file_name)
@@ -2213,6 +2300,11 @@
         pass
 
 
+    def _pidfile_label(self):
+        # '.autoserv_execute' -> 'autoserv'
+        return self._pidfile_name()[1:-len('_execute')]
+
+
 class GatherLogsTask(PostJobTask):
     """
     Task responsible for
@@ -2231,8 +2323,10 @@
     def _generate_command(self, results_dir):
         host_list = ','.join(queue_entry.host.hostname
                              for queue_entry in self.queue_entries)
-        return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
-                '-r', results_dir]
+        return [_autoserv_path , '-p',
+                '--pidfile-label=%s' % self._pidfile_label(),
+                '--use-existing-results', '--collect-crashinfo',
+                '-m', host_list, '-r', results_dir]
 
 
     @property
@@ -2245,23 +2339,17 @@
 
 
     def prolog(self):
-        for queue_entry in self.queue_entries:
-            if queue_entry.status != models.HostQueueEntry.Status.GATHERING:
-                raise SchedulerError('Gather task attempting to start on '
-                                     'non-gathering entry: %s' % queue_entry)
-            if queue_entry.host.status != models.Host.Status.RUNNING:
-                raise SchedulerError('Gather task attempting to start on queue '
-                                     'entry with non-running host status %s: %s'
-                                     % (queue_entry.host.status, queue_entry))
+        self._check_queue_entry_statuses(
+                self.queue_entries,
+                allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
+                allowed_host_statuses=(models.Host.Status.RUNNING,))
 
         super(GatherLogsTask, self).prolog()
 
 
     def epilog(self):
         super(GatherLogsTask, self).epilog()
-
-        self._copy_and_parse_results(self.queue_entries,
-                                     use_monitor=self._autoserv_monitor)
+        self._parse_results(self.queue_entries)
         self._reboot_hosts()
 
 
@@ -2304,54 +2392,67 @@
             self.finished(True)
 
 
-class CleanupTask(PreJobTask):
-    TASK_TYPE = models.SpecialTask.Task.CLEANUP
+class SelfThrottledPostJobTask(PostJobTask):
+    """
+    Special AgentTask subclass that maintains its own global process limit.
+    """
+    _num_running_processes = 0
 
 
-    def __init__(self, task, recover_run_monitor=None):
-        super(CleanupTask, self).__init__(task, ['--cleanup'])
-        self._set_ids(host=self.host, queue_entries=[self.queue_entry])
+    @classmethod
+    def _increment_running_processes(cls):
+        cls._num_running_processes += 1
 
 
-    def prolog(self):
-        super(CleanupTask, self).prolog()
-        logging.info("starting cleanup task for host: %s", self.host.hostname)
-        self.host.set_status(models.Host.Status.CLEANING)
-        if self.queue_entry:
-            self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
+    @classmethod
+    def _decrement_running_processes(cls):
+        cls._num_running_processes -= 1
 
 
-    def _finish_epilog(self):
-        if not self.queue_entry or not self.success:
+    @classmethod
+    def _max_processes(cls):
+        raise NotImplementedError
+
+
+    @classmethod
+    def _can_run_new_process(cls):
+        return cls._num_running_processes < cls._max_processes()
+
+
+    def _process_started(self):
+        return bool(self.monitor)
+
+
+    def tick(self):
+        # override tick to keep trying to start until the process count goes
+        # down and we can, at which point we revert to default behavior
+        if self._process_started():
+            super(SelfThrottledPostJobTask, self).tick()
+        else:
+            self._try_starting_process()
+
+
+    def run(self):
+        # override run() to not actually run unless we can
+        self._try_starting_process()
+
+
+    def _try_starting_process(self):
+        if not self._can_run_new_process():
             return
 
-        do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
-        should_run_verify = (
-                self.queue_entry.job.run_verify
-                and self.host.protection != do_not_verify_protection)
-        if should_run_verify:
-            entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
-            models.SpecialTask.objects.create(
-                    host=models.Host.objects.get(id=self.host.id),
-                    queue_entry=entry,
-                    task=models.SpecialTask.Task.VERIFY)
-        else:
-            self.queue_entry.on_pending()
+        # actually run the command
+        super(SelfThrottledPostJobTask, self).run()
+        self._increment_running_processes()
 
 
-    def epilog(self):
-        super(CleanupTask, self).epilog()
-
-        if self.success:
-            self.host.update_field('dirty', 0)
-            self.host.set_status(models.Host.Status.READY)
-
-        self._finish_epilog()
+    def finished(self, success):
+        super(SelfThrottledPostJobTask, self).finished(success)
+        if self._process_started():
+            self._decrement_running_processes()
 
 
-class FinalReparseTask(PostJobTask):
-    _num_running_parses = 0
-
+class FinalReparseTask(SelfThrottledPostJobTask):
     def __init__(self, queue_entries):
         super(FinalReparseTask, self).__init__(queue_entries,
                                                log_file_name='.parse.log')
@@ -2360,7 +2461,7 @@
 
 
     def _generate_command(self, results_dir):
-        return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
+        return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
                 results_dir]
 
 
@@ -2373,91 +2474,59 @@
         return _PARSER_PID_FILE
 
 
-    def _parse_started(self):
-        return bool(self.monitor)
-
-
     @classmethod
-    def _increment_running_parses(cls):
-        cls._num_running_parses += 1
-
-
-    @classmethod
-    def _decrement_running_parses(cls):
-        cls._num_running_parses -= 1
-
-
-    @classmethod
-    def _can_run_new_parse(cls):
-        return (cls._num_running_parses <
-                scheduler_config.config.max_parse_processes)
+    def _max_processes(cls):
+        return scheduler_config.config.max_parse_processes
 
 
     def prolog(self):
-        for queue_entry in self.queue_entries:
-            if queue_entry.status != models.HostQueueEntry.Status.PARSING:
-                raise SchedulerError('Parse task attempting to start on '
-                                     'non-parsing entry: %s' % queue_entry)
+        self._check_queue_entry_statuses(
+                self.queue_entries,
+                allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
 
         super(FinalReparseTask, self).prolog()
 
 
     def epilog(self):
         super(FinalReparseTask, self).epilog()
-        self._set_all_statuses(self._final_status())
+        self._archive_results(self.queue_entries)
 
 
-    def tick(self):
-        # override tick to keep trying to start until the parse count goes down
-        # and we can, at which point we revert to default behavior
-        if self._parse_started():
-            super(FinalReparseTask, self).tick()
-        else:
-            self._try_starting_parse()
+class ArchiveResultsTask(SelfThrottledPostJobTask):
+    def __init__(self, queue_entries):
+        super(ArchiveResultsTask, self).__init__(queue_entries,
+                                                 log_file_name='.archiving.log')
+        # don't use _set_ids, since we don't want to set the host_ids
+        self.queue_entry_ids = [entry.id for entry in queue_entries]
 
 
-    def run(self):
-        # override run() to not actually run unless we can
-        self._try_starting_parse()
+    def _pidfile_name(self):
+        return _ARCHIVER_PID_FILE
 
 
-    def _try_starting_parse(self):
-        if not self._can_run_new_parse():
-            return
-
-        # actually run the parse command
-        super(FinalReparseTask, self).run()
-        self._increment_running_parses()
+    def _generate_command(self, results_dir):
+        return [_autoserv_path , '-p',
+                '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
+                '--use-existing-results',
+                os.path.join('..', 'scheduler', 'archive_results.control.srv')]
 
 
-    def finished(self, success):
-        super(FinalReparseTask, self).finished(success)
-        if self._parse_started():
-            self._decrement_running_parses()
-
-
-class HostlessQueueTask(AbstractQueueTask):
-    def __init__(self, queue_entry):
-        super(HostlessQueueTask, self).__init__([queue_entry])
-        self.queue_entry_ids = [queue_entry.id]
+    @classmethod
+    def _max_processes(cls):
+        return scheduler_config.config.max_transfer_processes
 
 
     def prolog(self):
-        self.queue_entries[0].update_field('execution_subdir', 'hostless')
-        super(HostlessQueueTask, self).prolog()
+        self._check_queue_entry_statuses(
+                self.queue_entries,
+                allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
+
+        super(ArchiveResultsTask, self).prolog()
 
 
-    def _final_status(self):
-        if self.queue_entries[0].aborted:
-            return models.HostQueueEntry.Status.ABORTED
-        if self.monitor.exit_code() == 0:
-            return models.HostQueueEntry.Status.COMPLETED
-        return models.HostQueueEntry.Status.FAILED
-
-
-    def _finish_task(self):
-        super(HostlessQueueTask, self)._finish_task()
-        self.queue_entries[0].set_status(self._final_status())
+    def epilog(self):
+        super(ArchiveResultsTask, self).epilog()
+        self._set_all_statuses(self._final_status())
 
 
 class DBError(Exception):
@@ -2899,34 +2968,22 @@
 
         self.update_field('status', status)
 
-        if status in (models.HostQueueEntry.Status.QUEUED,
-                      models.HostQueueEntry.Status.PARSING):
-            self.update_field('complete', False)
-            self.update_field('active', False)
+        active = (status in models.HostQueueEntry.ACTIVE_STATUSES)
+        complete = (status in models.HostQueueEntry.COMPLETE_STATUSES)
+        assert not (active and complete)
 
-        if status in (models.HostQueueEntry.Status.PENDING,
-                      models.HostQueueEntry.Status.RUNNING,
-                      models.HostQueueEntry.Status.VERIFYING,
-                      models.HostQueueEntry.Status.STARTING,
-                      models.HostQueueEntry.Status.GATHERING):
-            self.update_field('complete', False)
-            self.update_field('active', True)
+        self.update_field('active', active)
+        self.update_field('complete', complete)
 
-        if status in (models.HostQueueEntry.Status.FAILED,
-                      models.HostQueueEntry.Status.COMPLETED,
-                      models.HostQueueEntry.Status.STOPPED,
-                      models.HostQueueEntry.Status.ABORTED):
-            self.update_field('complete', True)
-            self.update_field('active', False)
+        if complete:
             self._on_complete()
+            self._email_on_job_complete()
 
         should_email_status = (status.lower() in _notify_email_statuses or
                                'all' in _notify_email_statuses)
         if should_email_status:
             self._email_on_status(status)
 
-        self._email_on_job_complete()
-
 
     def _on_complete(self):
         self.job.stop_if_necessary()
@@ -3063,7 +3120,7 @@
         assert self.aborted and not self.complete
 
         Status = models.HostQueueEntry.Status
-        if self.status in (Status.GATHERING, Status.PARSING):
+        if self.status in (Status.GATHERING, Status.PARSING, Status.ARCHIVING):
             # do nothing; post-job tasks will finish and then mark this entry
             # with status "Aborted" and take care of the host
             return