-ensure Django connection is autocommit enabled, when used from monitor_db
-fix HostScheduler to not crash when there are no ready hosts
-change RebootTask to optionally take a queue entry and pass it to the RepairTask if reboot fails.  This allows jobs to be failed if the pre-verify reboot fails, instead of being left hanging.
-add unit test for RebootTask
-add check for DB inconsistencies to cleanup step.  Currently this just checks for HQEs with active=complete=1.
-when unexpected existing results files are found, email a warning



git-svn-id: http://test.kernel.org/svn/autotest/trunk@2368 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/frontend/setup_django_environment.py b/frontend/setup_django_environment.py
index 28d3630..7db92dc 100644
--- a/frontend/setup_django_environment.py
+++ b/frontend/setup_django_environment.py
@@ -3,3 +3,8 @@
 from autotest_lib.frontend import settings
 
 management.setup_environ(settings)
+
+def enable_autocommit():
+    from django.db import connection
+    connection.cursor() # ensure a connection is open
+    connection.connection.autocommit(True)
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index 3975f3c..26a70d7 100644
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -124,6 +124,9 @@
     _db = database_connection.DatabaseConnection(CONFIG_SECTION)
     _db.connect()
 
+    # ensure Django connection is in autocommit
+    setup_django_environment.enable_autocommit()
+
     print "Setting signal handler"
     signal.signal(signal.SIGINT, handle_sigint)
 
@@ -321,6 +324,8 @@
 
     @classmethod
     def _get_label_hosts(cls, host_ids):
+        if not host_ids:
+            return {}, {}
         query = """
         SELECT label_id, host_id
         FROM hosts_labels
@@ -478,6 +483,7 @@
             self._abort_timed_out_jobs()
             self._abort_jobs_past_synch_start_timeout()
             self._clear_inactive_blocks()
+            self._check_for_db_inconsistencies()
             self._last_clean_time = time.time()
 
 
@@ -807,6 +813,20 @@
         print num_running_processes, 'running processes'
 
 
+    def _check_for_db_inconsistencies(self):
+        query = models.HostQueueEntry.objects.filter(active=True, complete=True)
+        if query.count() != 0:
+            subject = ('%d queue entries found with active=complete=1'
+                       % query.count())
+            message = '\n'.join(str(entry.get_object_dict())
+                                for entry in query[:50])
+            if len(query) > 50:
+                message += '\n(truncated)\n'
+
+            print subject
+            email_manager.enqueue_notify_email(subject, message)
+
+
 class RunMonitor(object):
     def __init__(self, cmd, nice_level = None, log_file = None):
         self.nice_level = nice_level
@@ -1312,8 +1332,9 @@
         if it exists, even if it's a directory.
         """
         if os.path.exists(dest):
-            print ('Warning: removing existing destination file ' +
-                   dest)
+            warning = 'Warning: removing existing destination file ' + dest
+            print warning
+            email_manager.enqueue_notify_email(warning, warning)
             remove_file_or_dir(dest)
         shutil.move(source, dest)
 
@@ -1439,7 +1460,9 @@
 
         if do_reboot:
             for queue_entry in self.queue_entries:
-                reboot_task = RebootTask(queue_entry.get_host())
+                # don't pass the queue entry to the RebootTask. if the reboot
+                # fails, the job doesn't care -- it's over.
+                reboot_task = RebootTask(host=queue_entry.get_host())
                 self.agent.dispatcher.add_agent(Agent([reboot_task]))
 
 
@@ -1473,8 +1496,10 @@
 
 
 class RebootTask(AgentTask):
-    def __init__(self, host):
-        global _autoserv_path
+    def __init__(self, host=None, queue_entry=None):
+        assert bool(host) ^ bool(queue_entry)
+        if queue_entry:
+            host = queue_entry.get_host()
 
         # Current implementation of autoserv requires control file
         # to be passed on reboot action request. TODO: remove when no
@@ -1482,9 +1507,10 @@
         self.create_temp_resultsdir('.reboot')
         self.cmd = [_autoserv_path, '-b', '-m', host.hostname,
                     '-r', self.temp_results_dir, '/dev/null']
+        self.queue_entry = queue_entry
         self.host = host
-        super(RebootTask, self).__init__(self.cmd,
-                           failure_tasks=[RepairTask(host)])
+        repair_task = RepairTask(host, fail_queue_entry=queue_entry)
+        super(RebootTask, self).__init__(self.cmd, failure_tasks=[repair_task])
 
 
     def prolog(self):
@@ -1494,9 +1520,11 @@
 
     def epilog(self):
         super(RebootTask, self).epilog()
-        self.host.set_status('Ready')
         if self.success:
+            self.host.set_status('Ready')
             self.host.update_field('dirty', 0)
+        elif self.queue_entry:
+            self.queue_entry.requeue()
 
 
 class AbortTask(AgentTask):
@@ -2037,7 +2065,7 @@
 
         host = self.get_host()
         if host:
-            reboot_task = RebootTask(host)
+            reboot_task = RebootTask(host=host)
             verify_task = VerifyTask(host=host)
             # just to make sure this host does not get taken away
             host.set_status('Rebooting')
@@ -2207,7 +2235,7 @@
 
         tasks = []
         if do_reboot:
-            tasks.append(RebootTask(queue_entry.get_host()))
+            tasks.append(RebootTask(queue_entry=queue_entry))
         tasks.append(verify_task_class(queue_entry=queue_entry))
         return tasks
 
diff --git a/scheduler/monitor_db_unittest.py b/scheduler/monitor_db_unittest.py
index 757f104..d62a1bc 100644
--- a/scheduler/monitor_db_unittest.py
+++ b/scheduler/monitor_db_unittest.py
@@ -376,6 +376,13 @@
         self._check_for_extra_schedulings()
 
 
+    def test_no_ready_hosts(self):
+        self._create_job(hosts=[1])
+        self._do_query('UPDATE hosts SET status="Repair Failed"')
+        self._dispatcher._schedule_new_jobs()
+        self._check_for_extra_schedulings()
+
+
 class DispatcherThrottlingTest(BaseSchedulerTest):
     """
     Test that the dispatcher throttles:
@@ -1050,6 +1057,45 @@
         self.god.check_playback()
 
 
+    def _test_reboot_task_helper(self, success, use_queue_entry=False):
+        if use_queue_entry:
+            self.queue_entry.get_host.expect_call().and_return(self.host)
+        self.host.set_status.expect_call('Rebooting')
+        if success:
+            self.setup_run_monitor(0)
+            self.host.set_status.expect_call('Ready')
+            self.host.update_field.expect_call('dirty', 0)
+        else:
+            self.setup_run_monitor(1)
+            if use_queue_entry:
+                self.queue_entry.requeue.expect_call()
+
+        if use_queue_entry:
+            task = monitor_db.RebootTask(queue_entry=self.queue_entry)
+        else:
+            task = monitor_db.RebootTask(host=self.host)
+        self.assertEquals(len(task.failure_tasks), 1)
+        repair_task = task.failure_tasks[0]
+        self.assert_(isinstance(repair_task, monitor_db.RepairTask))
+        if use_queue_entry:
+            self.assertEquals(repair_task.fail_queue_entry, self.queue_entry)
+
+        self.run_task(task, success)
+
+        self.god.check_playback()
+        self.assert_(set(task.monitor.cmd) >=
+                        set(['autoserv', '-b', '-m', self.HOSTNAME,
+                             '-r', self.TEMP_DIR, '/dev/null']))
+
+    def test_reboot_task(self):
+        self._test_reboot_task_helper(True)
+        self._test_reboot_task_helper(False)
+
+
+    def test_reboot_task_with_queue_entry(self):
+        self._test_reboot_task_helper(False, True)
+
+
 class JobTest(BaseSchedulerTest):
     def _test_run_helper(self, expect_agent=True):
         job = monitor_db.Job.fetch('id = 1').next()