[autotest] Scheduler refactor.

Break scheduler into simpler modules.
This change also modifies run_pylint to check for undefined variables.

BUG=chromium:312338
TEST=Ran smoke suite against multiple duts.
     Triggered agents like repair, verify etc. Pylint, Unittests.
DEPLOY=scheduler

Change-Id: Ibd685a27b5b50abd26cdf2976ac4189c3e9acc0a
Reviewed-on: https://chromium-review.googlesource.com/174080
Commit-Queue: Prashanth B <beeps@chromium.org>
Tested-by: Prashanth B <beeps@chromium.org>
Reviewed-by: Alex Miller <milleral@chromium.org>
diff --git a/scheduler/postjob_task.py b/scheduler/postjob_task.py
new file mode 100644
index 0000000..4382093
--- /dev/null
+++ b/scheduler/postjob_task.py
@@ -0,0 +1,390 @@
+#pylint: disable-msg=C0111
+
+"""
+Postjob task.
+
+Postjob tasks are responsible for setting the final status of the HQE
+and Host, and scheduling additional special agents such as cleanup,
+if necessary.
+"""
+
+import os
+
+from autotest_lib.frontend.afe import models, model_attributes
+from autotest_lib.scheduler import agent_task, drones, drone_manager
+from autotest_lib.scheduler import email_manager, pidfile_monitor
+from autotest_lib.scheduler import scheduler_config
+from autotest_lib.server import autoserv_utils
+from autotest_lib.site_utils.graphite import stats
+
+
+_drone_manager = drone_manager.instance()
+_parser_path = autoserv_utils._parser_path_func(
+                autoserv_utils.AUTOTEST_INSTALL_DIR)
+
+
+class PostJobTask(agent_task.AgentTask):
+    def __init__(self, queue_entries, log_file_name):
+        super(PostJobTask, self).__init__(log_file_name=log_file_name)
+
+        self.queue_entries = queue_entries
+
+        self._autoserv_monitor = pidfile_monitor.PidfileRunMonitor()
+        self._autoserv_monitor.attach_to_existing_process(
+                self._working_directory())
+
+
+    def _command_line(self):
+        # Do we need testing_mode?
+        return self._generate_command(
+                _drone_manager.absolute_path(self._working_directory()))
+
+
+    def _generate_command(self, results_dir):
+        raise NotImplementedError('Subclasses must override this')
+
+
+    @property
+    def owner_username(self):
+        return self.queue_entries[0].job.owner
+
+
+    def _working_directory(self):
+        return self._get_consistent_execution_path(self.queue_entries)
+
+
+    def _paired_with_monitor(self):
+        return self._autoserv_monitor
+
+
+    def _job_was_aborted(self):
+        was_aborted = None
+        for queue_entry in self.queue_entries:
+            queue_entry.update_from_database()
+            if was_aborted is None: # first queue entry
+                was_aborted = bool(queue_entry.aborted)
+            elif was_aborted != bool(queue_entry.aborted): # subsequent entries
+                entries = ['%s (aborted: %s)' % (entry, entry.aborted)
+                           for entry in self.queue_entries]
+                email_manager.manager.enqueue_notify_email(
+                        'Inconsistent abort state',
+                        'Queue entries have inconsistent abort state:\n' +
+                        '\n'.join(entries))
+                # don't crash here, just assume true
+                return True
+        return was_aborted
+
+
+    def _final_status(self):
+        if self._job_was_aborted():
+            return models.HostQueueEntry.Status.ABORTED
+
+        # we'll use a PidfileRunMonitor to read the autoserv exit status
+        if self._autoserv_monitor.exit_code() == 0:
+            return models.HostQueueEntry.Status.COMPLETED
+        return models.HostQueueEntry.Status.FAILED
+
+
+    def _set_all_statuses(self, status):
+        for queue_entry in self.queue_entries:
+            queue_entry.set_status(status)
+
+
+    def abort(self):
+        # override AgentTask.abort() to avoid killing the process and ending
+        # the task.  post-job tasks continue when the job is aborted.
+        pass
+
+
+    def _pidfile_label(self):
+        # '.autoserv_execute' -> 'autoserv'
+        return self._pidfile_name()[1:-len('_execute')]
+
+
+class SelfThrottledPostJobTask(PostJobTask):
+    """
+    PostJobTask that maintains its own process limit.
+
+    We throttle tasks like parsing because we don't want them to
+    hold up tests. At the same time we don't wish to build up load
+    that will take forever to parse.
+    """
+    _num_running_processes = 0
+    # Last known limit of max processes, used to check whether
+    # max processes config has been changed.
+    _last_known_max_processes = 0
+    # Whether an email should be sent to notifiy process limit being hit.
+    _notification_on = True
+    # Once process limit is hit, an email will be sent.
+    # To prevent spams, do not send another email until
+    # it drops to lower than the following level.
+    REVIVE_NOTIFICATION_THRESHOLD = 0.80
+
+
+    @classmethod
+    def _increment_running_processes(cls):
+        cls._num_running_processes += 1
+        stats.Gauge('scheduler').send('%s.num_running_processes' % cls.__name__,
+                                      cls._num_running_processes)
+
+
+    @classmethod
+    def _decrement_running_processes(cls):
+        cls._num_running_processes -= 1
+        stats.Gauge('scheduler').send('%s.num_running_processes' % cls.__name__,
+                                      cls._num_running_processes)
+
+
+    @classmethod
+    def _max_processes(cls):
+        raise NotImplementedError
+
+
+    @classmethod
+    def _can_run_new_process(cls):
+        return cls._num_running_processes < cls._max_processes()
+
+
+    def _process_started(self):
+        return bool(self.monitor)
+
+
+    def tick(self):
+        # override tick to keep trying to start until the process count goes
+        # down and we can, at which point we revert to default behavior
+        if self._process_started():
+            super(SelfThrottledPostJobTask, self).tick()
+        else:
+            self._try_starting_process()
+
+
+    def run(self):
+        # override run() to not actually run unless we can
+        self._try_starting_process()
+
+
+    @classmethod
+    def _notify_process_limit_hit(cls):
+        """Send an email to notify that process limit is hit."""
+        if cls._notification_on:
+            subject = '%s: hitting max process limit.' % cls.__name__
+            message = ('Running processes/Max processes: %d/%d'
+                       % (cls._num_running_processes, cls._max_processes()))
+            email_manager.manager.enqueue_notify_email(subject, message)
+            cls._notification_on = False
+
+
+    @classmethod
+    def _reset_notification_switch_if_necessary(cls):
+        """Reset _notification_on if necessary.
+
+        Set _notification_on to True on the following cases:
+        1) If the limit of max processes configuration changes;
+        2) If _notification_on is False and the number of running processes
+           drops to lower than a level defined in REVIVE_NOTIFICATION_THRESHOLD.
+
+        """
+        if cls._last_known_max_processes != cls._max_processes():
+            cls._notification_on = True
+            cls._last_known_max_processes = cls._max_processes()
+            return
+        percentage = float(cls._num_running_processes) / cls._max_processes()
+        if (not cls._notification_on and
+            percentage < cls.REVIVE_NOTIFICATION_THRESHOLD):
+            cls._notification_on = True
+
+
+    def _try_starting_process(self):
+        self._reset_notification_switch_if_necessary()
+        if not self._can_run_new_process():
+            self._notify_process_limit_hit()
+            return
+
+        # actually run the command
+        super(SelfThrottledPostJobTask, self).run()
+        if self._process_started():
+            self._increment_running_processes()
+
+
+    def finished(self, success):
+        super(SelfThrottledPostJobTask, self).finished(success)
+        if self._process_started():
+            self._decrement_running_processes()
+
+
+class GatherLogsTask(PostJobTask):
+    """
+    Task responsible for
+    * gathering uncollected logs (if Autoserv crashed hard or was killed)
+    * copying logs to the results repository
+    * spawning CleanupTasks for hosts, if necessary
+    * spawning a FinalReparseTask for the job
+    * setting the final status of the host, directly or through a cleanup
+    """
+    def __init__(self, queue_entries, recover_run_monitor=None):
+        self._job = queue_entries[0].job
+        super(GatherLogsTask, self).__init__(
+            queue_entries, log_file_name='.collect_crashinfo.log')
+        self._set_ids(queue_entries=queue_entries)
+
+
+    # TODO: Refactor into autoserv_utils. crbug.com/243090
+    def _generate_command(self, results_dir):
+        host_list = ','.join(queue_entry.host.hostname
+                             for queue_entry in self.queue_entries)
+        return [autoserv_utils.autoserv_path , '-p',
+                '--pidfile-label=%s' % self._pidfile_label(),
+                '--use-existing-results', '--collect-crashinfo',
+                '-m', host_list, '-r', results_dir]
+
+
+    @property
+    def num_processes(self):
+        return len(self.queue_entries)
+
+
+    def _pidfile_name(self):
+        return drone_manager.CRASHINFO_PID_FILE
+
+
+    def prolog(self):
+        self._check_queue_entry_statuses(
+                self.queue_entries,
+                allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
+                allowed_host_statuses=(models.Host.Status.RUNNING,))
+
+        super(GatherLogsTask, self).prolog()
+
+
+    def epilog(self):
+        super(GatherLogsTask, self).epilog()
+        self._parse_results(self.queue_entries)
+        self._reboot_hosts()
+
+
+    def _reboot_hosts(self):
+        if self._autoserv_monitor.has_process():
+            final_success = (self._final_status() ==
+                             models.HostQueueEntry.Status.COMPLETED)
+            num_tests_failed = self._autoserv_monitor.num_tests_failed()
+        else:
+            final_success = False
+            num_tests_failed = 0
+        reboot_after = self._job.reboot_after
+        do_reboot = (
+                # always reboot after aborted jobs
+                self._final_status() == models.HostQueueEntry.Status.ABORTED
+                or reboot_after == model_attributes.RebootAfter.ALWAYS
+                or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
+                    and final_success and num_tests_failed == 0)
+                or num_tests_failed > 0)
+
+        for queue_entry in self.queue_entries:
+            if do_reboot:
+                # don't pass the queue entry to the CleanupTask. if the cleanup
+                # fails, the job doesn't care -- it's over.
+                models.SpecialTask.objects.create(
+                        host=models.Host.objects.get(id=queue_entry.host.id),
+                        task=models.SpecialTask.Task.CLEANUP,
+                        requested_by=self._job.owner_model())
+            else:
+                queue_entry.host.set_status(models.Host.Status.READY)
+
+
+    def run(self):
+        autoserv_exit_code = self._autoserv_monitor.exit_code()
+        # only run if Autoserv exited due to some signal. if we have no exit
+        # code, assume something bad (and signal-like) happened.
+        if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
+            super(GatherLogsTask, self).run()
+        else:
+            self.finished(True)
+
+
+class FinalReparseTask(SelfThrottledPostJobTask):
+    def __init__(self, queue_entries):
+        super(FinalReparseTask, self).__init__(queue_entries,
+                                               log_file_name='.parse.log')
+        # don't use _set_ids, since we don't want to set the host_ids
+        self.queue_entry_ids = [entry.id for entry in queue_entries]
+
+
+    def _generate_command(self, results_dir):
+        return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
+                results_dir]
+
+
+    @property
+    def num_processes(self):
+        return 0 # don't include parser processes in accounting
+
+
+    def _pidfile_name(self):
+        return drone_manager.PARSER_PID_FILE
+
+
+    @classmethod
+    def _max_processes(cls):
+        return scheduler_config.config.max_parse_processes
+
+
+    def prolog(self):
+        self._check_queue_entry_statuses(
+                self.queue_entries,
+                allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
+
+        super(FinalReparseTask, self).prolog()
+
+
+    def epilog(self):
+        super(FinalReparseTask, self).epilog()
+        self._archive_results(self.queue_entries)
+
+
+class ArchiveResultsTask(SelfThrottledPostJobTask):
+    _ARCHIVING_FAILED_FILE = '.archiver_failed'
+
+    def __init__(self, queue_entries):
+        super(ArchiveResultsTask, self).__init__(queue_entries,
+                                                 log_file_name='.archiving.log')
+        # don't use _set_ids, since we don't want to set the host_ids
+        self.queue_entry_ids = [entry.id for entry in queue_entries]
+
+
+    def _pidfile_name(self):
+        return drone_manager.ARCHIVER_PID_FILE
+
+
+    # TODO: Refactor into autoserv_utils. crbug.com/243090
+    def _generate_command(self, results_dir):
+        return [autoserv_utils.autoserv_path , '-p',
+                '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
+                '--use-existing-results', '--control-filename=control.archive',
+                os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
+                             'archive_results.control.srv')]
+
+
+    @classmethod
+    def _max_processes(cls):
+        return scheduler_config.config.max_transfer_processes
+
+
+    def prolog(self):
+        self._check_queue_entry_statuses(
+                self.queue_entries,
+                allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
+
+        super(ArchiveResultsTask, self).prolog()
+
+
+    def epilog(self):
+        super(ArchiveResultsTask, self).epilog()
+        if not self.success and self._paired_with_monitor().has_process():
+            failed_file = os.path.join(self._working_directory(),
+                                       self._ARCHIVING_FAILED_FILE)
+            paired_process = self._paired_with_monitor().get_process()
+            _drone_manager.write_lines_to_file(
+                    failed_file, ['Archiving failed with exit code %s'
+                                  % self.monitor.exit_code()],
+                    paired_with_process=paired_process)
+        self._set_all_statuses(self._final_status())