Add options to control reboots before and after a job.

-add reboot_before and reboot_after fields to Job, along with enums for each
-add options to create_job RPC for reboot_before and reboot_after
-add options to job create CLI for these fields, and made job stat -v display them
-add widgets to job create page in AFE for these fields and made job detail view display them

-add dirty field to Hosts, defaulting to True, and set to True when a host is locked
-made scheduler set this field when a job runs and clear it when a host is rebooted

-updated scheduler's PidfileRunMonitor to read a new three-line .autoserv_execute format, where the third line contains the number of tests that failed
-made scheduler Job.run() include a RebootTask before the verify task according to the reboot_before option
-made QueueTask.epilog() launch a RebootTask for each host according to the reboot_after option

-updated autoserv to write out a third line to .autoserv_execute containing the number of failed tests.

Other changes:
-added support for displaying Job.run_verify in the CLI (job stat -v) and job detail page on AFE
-updated ModelExtensions to convert BooleanField values to actual booleans.  The MySQL Django backend just leaves them as ints (as they are represented in the DB), and it's stupid and annoying (Yes, bool is a subclass of int, so it's often not a problem.  But yes, it can be.).
-get rid of use of Job.synch_count since we don't actually support it.  I think this was meant for inclusion in a previous change and got left out.
-made the scheduler use the new setup_django_environment stuff to import and use the django models.  It doesn't *really* use the models yet -- it just uses the Job.Reboot{Before,After} enum objects -- but this shows we could easily start using the models, and that's definitely the direction I want to go long term.
-refactored PidfileRunMonitor generally and made it a bit more robust by having it email errors for corrupt pidfiles and continue gracefully, instead of just crashing the scheduler
-changed the way Agent.tick() works.  now, it basically runs through as much work as it can in a single call.  for example, if there's a RebootTask and a VerifyTask, and the RebootTask has just finished, in a single call it will finish up the RebootTask and start the VerifyTask.  this used to take two cycles and that was problematic for cases like this one -- the RebootTask would like to set host.status=Ready, but then the host could get snatched up on the next scheduling round, before the VerifyTask got started.  This was sort of solved previously by keeping the HostQueueEntry active, and we could apply that approach here by making a new status for HostQueueEntries like "Rebooting".  But I prefer this approach as I think it's more efficient, more powerful and easier to work with.

Risk: extremely high
Visibility: new reboot options for jobs, skip verify now displayed in AFE + CLI

Signed-off-by: Steve Howard <showard@google.com>


git-svn-id: http://test.kernel.org/svn/autotest/trunk@2308 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index a1f2179..21c15b0 100644
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -8,9 +8,11 @@
 import datetime, errno, MySQLdb, optparse, os, pwd, Queue, re, shutil, signal
 import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
 import common
+from autotest_lib.frontend import setup_django_environment
 from autotest_lib.client.common_lib import global_config
 from autotest_lib.client.common_lib import host_protections, utils
 from autotest_lib.database import database_connection
+from autotest_lib.frontend.afe import models
 
 
 RESULTS_DIR = '.'
@@ -595,14 +597,14 @@
         for queue_entry in queue_entries:
             run_monitor = PidfileRunMonitor(
                 queue_entry.results_dir())
-            pid, exit_code = run_monitor.get_pidfile_info()
-            if pid is None:
+            if not run_monitor.has_pid():
                 # autoserv apparently never got run, so requeue
                 requeue_entries.append(queue_entry)
                 continue
             if queue_entry.id in recovered_entry_ids:
                 # synchronous job we've already recovered
                 continue
+            pid = run_monitor.get_pid()
             print 'Recovering queue entry %d (pid %d)' % (
                 queue_entry.id, pid)
             job = queue_entry.job
@@ -877,19 +879,34 @@
 
 
 class PidfileRunMonitor(RunMonitor):
+    class PidfileState(object):
+        pid = None
+        exit_status = None
+        num_tests_failed = None
+
+        def reset(self):
+            self.pid = self.exit_status = self.all_tests_passed = None
+
+
     def __init__(self, results_dir, cmd=None, nice_level=None,
                  log_file=None):
         self.results_dir = os.path.abspath(results_dir)
         self.pid_file = os.path.join(results_dir, AUTOSERV_PID_FILE)
         self.lost_process = False
         self.start_time = time.time()
+        self._state = self.PidfileState()
         super(PidfileRunMonitor, self).__init__(cmd, nice_level, log_file)
 
 
+    def has_pid(self):
+        self._get_pidfile_info()
+        return self._state.pid is not None
+
+
     def get_pid(self):
-        pid, exit_status = self.get_pidfile_info()
-        assert pid is not None
-        return pid
+        self._get_pidfile_info()
+        assert self._state.pid is not None
+        return self._state.pid
 
 
     def _check_command_line(self, command_line, spacer=' ',
@@ -902,8 +919,8 @@
         return match
 
 
-    def _check_proc_fs(self, pid):
-        cmdline_path = os.path.join('/proc', str(pid), 'cmdline')
+    def _check_proc_fs(self):
+        cmdline_path = os.path.join('/proc', str(self._state.pid), 'cmdline')
         try:
             cmdline_file = open(cmdline_path, 'r')
             cmdline = cmdline_file.read().strip()
@@ -915,24 +932,29 @@
                                         print_error=True)
 
 
-    def read_pidfile(self):
+    def _read_pidfile(self):
+        self._state.reset()
         if not os.path.exists(self.pid_file):
-            return None, None
+            return
         file_obj = open(self.pid_file, 'r')
         lines = file_obj.readlines()
         file_obj.close()
-        assert 1 <= len(lines) <= 2
+        if not 1 <= len(lines) <= 3:
+            raise PidfileException('Corrupt pid file (%d lines) at %s:\n%s' %
+                                   (len(lines), self.pid_file, lines))
         try:
-            pid = int(lines[0])
-            exit_status = None
-            if len(lines) == 2:
-                exit_status = int(lines[1])
+            self._state.pid = int(lines[0])
+            if len(lines) > 1:
+                self._state.exit_status = int(lines[1])
+                if len(lines) == 3:
+                    self._state.num_tests_failed = int(lines[2])
+                else:
+                    # maintain backwards-compatibility with two-line pidfiles
+                    self._state.num_tests_failed = 0
         except ValueError, exc:
             raise PidfileException('Corrupt pid file: ' +
                                    str(exc.args))
 
-        return pid, exit_status
-
 
     def _find_autoserv_proc(self):
         autoserv_procs = Dispatcher.find_autoservs()
@@ -942,43 +964,55 @@
         return None, None
 
 
-    def get_pidfile_info(self):
-        """\
-        Returns:
-         None, None if autoserv has not yet run
-         pid,  None if autoserv is running
-         pid, exit_status if autoserv has completed
-        """
+    def _handle_pidfile_error(self, error, message=''):
+        message = error + '\nPid: %s\nPidfile: %s\n%s' % (self._state.pid,
+                                                          self.pid_file,
+                                                          message)
+        print message
+        email_manager.enqueue_notify_email(error, message)
+        if self._state.pid is not None:
+            pid = self._state.pid
+        else:
+            pid = 0
+        self.on_lost_process(pid)
+
+
+    def _get_pidfile_info_helper(self):
         if self.lost_process:
-            return self.pid, self.exit_status
+            return
 
-        pid, exit_status = self.read_pidfile()
+        self._read_pidfile()
 
-        if pid is None:
-            return self._handle_no_pid()
+        if self._state.pid is None:
+            self._handle_no_pid()
+            return
 
-        if exit_status is None:
+        if self._state.exit_status is None:
             # double check whether or not autoserv is running
-            proc_running = self._check_proc_fs(pid)
+            proc_running = self._check_proc_fs()
             if proc_running:
-                return pid, exit_status
+                return
 
             # pid but no process - maybe process *just* exited
-            pid, exit_status = self.read_pidfile()
-            if exit_status is None:
+            self._read_pidfile()
+            if self._state.exit_status is None:
                 # autoserv exited without writing an exit code
                 # to the pidfile
-                error = ('autoserv died without writing exit '
-                         'code')
-                message = error + '\nPid: %s\nPidfile: %s' % (
-                    pid, self.pid_file)
-                print message
-                email_manager.enqueue_notify_email(error,
-                                                   message)
-                self.on_lost_process(pid)
-                return self.pid, self.exit_status
+                self._handle_pidfile_error(
+                    'autoserv died without writing exit code')
 
-        return pid, exit_status
+
+    def _get_pidfile_info(self):
+        """\
+        After completion, self._state will contain:
+         pid=None, exit_status=None if autoserv has not yet run
+         pid!=None, exit_status=None if autoserv is running
+         pid!=None, exit_status!=None if autoserv has completed
+        """
+        try:
+            self._get_pidfile_info_helper()
+        except PidfileException, exc:
+            self._handle_pidfile_error('Pidfile error', traceback.format_exc())
 
 
     def _handle_no_pid(self):
@@ -1003,9 +1037,7 @@
             else:
                 pid = 0
             self.on_lost_process(pid)
-            return self.pid, self.exit_status
-
-        return None, None
+            return
 
 
     def on_lost_process(self, pid):
@@ -1018,13 +1050,20 @@
         pid is unimportant here, as it shouldn't be used by anyone.
         """
         self.lost_process = True
-        self.pid = pid
-        self.exit_status = 1
+        self._state.pid = pid
+        self._state.exit_status = 1
+        self._state.num_tests_failed = 0
 
 
     def exit_code(self):
-        pid, exit_code = self.get_pidfile_info()
-        return exit_code
+        self._get_pidfile_info()
+        return self._state.exit_status
+
+
+    def num_tests_failed(self):
+        self._get_pidfile_info()
+        assert self._state.num_tests_failed is not None
+        return self._state.num_tests_failed
 
 
 class Agent(object):
@@ -1045,11 +1084,12 @@
 
 
     def tick(self):
-        print "agent tick"
-        if self.active_task and not self.active_task.is_done():
-            self.active_task.poll()
-        else:
-            self._next_task();
+        while not self.is_done():
+            if self.active_task and not self.active_task.is_done():
+                self.active_task.poll()
+                if not self.active_task.is_done():
+                    return
+            self._next_task()
 
 
     def _next_task(self):
@@ -1353,6 +1393,7 @@
             print "starting queue_task on %s/%s" % (queue_entry.host.hostname, queue_entry.id)
             queue_entry.set_status('Running')
             queue_entry.host.set_status('Running')
+            queue_entry.host.update_field('dirty', 1)
         if (not self.job.is_synchronous() and
             self.job.num_machines() > 1):
             assert len(self.queue_entries) == 1
@@ -1395,6 +1436,21 @@
         self._finish_task()
 
 
+    def _reboot_hosts(self):
+        reboot_after = self.job.reboot_after
+        do_reboot = False
+        if reboot_after == models.Job.RebootAfter.ALWAYS:
+            do_reboot = True
+        elif reboot_after == models.Job.RebootAfter.IF_ALL_TESTS_PASSED:
+            num_tests_failed = self.monitor.num_tests_failed()
+            do_reboot = (self.success and num_tests_failed == 0)
+
+        if do_reboot:
+            for queue_entry in self.queue_entries:
+                reboot_task = RebootTask(queue_entry.get_host())
+                self.agent.dispatcher.add_agent(Agent([reboot_task]))
+
+
     def epilog(self):
         super(QueueTask, self).epilog()
         if self.success:
@@ -1407,6 +1463,7 @@
             queue_entry.host.set_status('Ready')
 
         self._finish_task()
+        self._reboot_hosts()
 
         print "queue_task finished with %s/%s" % (status, self.success)
 
@@ -1447,6 +1504,13 @@
         self.host.set_status("Rebooting")
 
 
+    def epilog(self):
+        super(RebootTask, self).epilog()
+        self.host.set_status('Ready')
+        if self.success:
+            self.host.update_field('dirty', 0)
+
+
 class AbortTask(AgentTask):
     def __init__(self, queue_entry, agents_to_abort):
         self.queue_entry = queue_entry
@@ -1621,7 +1685,7 @@
     @classmethod
     def _fields(cls):
         return ['id', 'hostname', 'locked', 'synch_id','status',
-                'invalid', 'protection', 'locked_by_id', 'lock_time']
+                'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty']
 
 
     def current_task(self):
@@ -1912,7 +1976,7 @@
         return  ['id', 'owner', 'name', 'priority', 'control_file',
                  'control_type', 'created_on', 'synch_type',
                  'synch_count', 'synchronizing', 'timeout',
-                 'run_verify', 'email_list']
+                 'run_verify', 'email_list', 'reboot_before', 'reboot_after']
 
 
     def is_server_job(self):
@@ -2032,9 +2096,8 @@
         job_tag = self._get_job_tag(queue_entries)
 
         params = [_autoserv_path, '-P', job_tag, '-p', '-n',
-                  '-r', os.path.abspath(results_dir),
-                  '-b', '-u', self.owner, '-l', self.name,
-                  '-m', hostnames, control_filename]
+                  '-r', os.path.abspath(results_dir), '-u', self.owner,
+                  '-l', self.name, '-m', hostnames, control_filename]
 
         if not self.is_server_job():
             params.append('-c')
@@ -2042,10 +2105,26 @@
         return params
 
 
+    def _get_pre_job_tasks(self, queue_entry, verify_task_class=VerifyTask):
+        do_reboot = False
+        if self.reboot_before == models.Job.RebootBefore.ALWAYS:
+            do_reboot = True
+        elif self.reboot_before == models.Job.RebootBefore.IF_DIRTY:
+            do_reboot = queue_entry.get_host().dirty
+
+        tasks = []
+        if do_reboot:
+            tasks.append(RebootTask(queue_entry.get_host()))
+        tasks.append(verify_task_class(queue_entry=queue_entry))
+        return tasks
+
+
     def _run_synchronous(self, queue_entry):
         if not self.is_ready():
             if self.run_verify:
-                return Agent([VerifySynchronousTask(queue_entry=queue_entry)], [queue_entry.id])
+                return Agent(self._get_pre_job_tasks(queue_entry,
+                                                     VerifySynchronousTask),
+                             [queue_entry.id])
             else:
                 return queue_entry.on_pending()
 
@@ -2055,13 +2134,9 @@
 
 
     def _run_asynchronous(self, queue_entry):
-        # TODO(showard): this is of questionable necessity, but in the interest
-        # of lowering risk, I'm leaving it in for now
-        assert queue_entry
-
         initial_tasks = []
         if self.run_verify:
-            initial_tasks = [VerifyTask(queue_entry)]
+            initial_tasks = self._get_pre_job_tasks(queue_entry)
         return self._finish_run([queue_entry], initial_tasks)