-ensure Django connection is autocommit enabled, when used from monitor_db
-fix HostScheduler to not crash when there are no ready hosts
-change RebootTask to optionally take a queue entry and pass it to the RepairTask if reboot fails. This allows jobs to be failed if the pre-verify reboot fails, instead of being left hanging.
-add unit test for RebootTask
-add check for DB inconsistencies to cleanup step. Currently this just checks for HQEs with active=complete=1.
-when unexpected existing results files are found, email a warning
git-svn-id: http://test.kernel.org/svn/autotest/trunk@2368 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/frontend/setup_django_environment.py b/frontend/setup_django_environment.py
index 28d3630..7db92dc 100644
--- a/frontend/setup_django_environment.py
+++ b/frontend/setup_django_environment.py
@@ -3,3 +3,8 @@
from autotest_lib.frontend import settings
management.setup_environ(settings)
+
+def enable_autocommit():
+ from django.db import connection
+ connection.cursor() # ensure a connection is open
+ connection.connection.autocommit(True)
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index 3975f3c..26a70d7 100644
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -124,6 +124,9 @@
_db = database_connection.DatabaseConnection(CONFIG_SECTION)
_db.connect()
+ # ensure Django connection is in autocommit
+ setup_django_environment.enable_autocommit()
+
print "Setting signal handler"
signal.signal(signal.SIGINT, handle_sigint)
@@ -321,6 +324,8 @@
@classmethod
def _get_label_hosts(cls, host_ids):
+ if not host_ids:
+ return {}, {}
query = """
SELECT label_id, host_id
FROM hosts_labels
@@ -478,6 +483,7 @@
self._abort_timed_out_jobs()
self._abort_jobs_past_synch_start_timeout()
self._clear_inactive_blocks()
+ self._check_for_db_inconsistencies()
self._last_clean_time = time.time()
@@ -807,6 +813,20 @@
print num_running_processes, 'running processes'
+ def _check_for_db_inconsistencies(self):
+ query = models.HostQueueEntry.objects.filter(active=True, complete=True)
+ if query.count() != 0:
+ subject = ('%d queue entries found with active=complete=1'
+ % query.count())
+ message = '\n'.join(str(entry.get_object_dict())
+ for entry in query[:50])
+ if len(query) > 50:
+ message += '\n(truncated)\n'
+
+ print subject
+ email_manager.enqueue_notify_email(subject, message)
+
+
class RunMonitor(object):
def __init__(self, cmd, nice_level = None, log_file = None):
self.nice_level = nice_level
@@ -1312,8 +1332,9 @@
if it exists, even if it's a directory.
"""
if os.path.exists(dest):
- print ('Warning: removing existing destination file ' +
- dest)
+ warning = 'Warning: removing existing destination file ' + dest
+ print warning
+ email_manager.enqueue_notify_email(warning, warning)
remove_file_or_dir(dest)
shutil.move(source, dest)
@@ -1439,7 +1460,9 @@
if do_reboot:
for queue_entry in self.queue_entries:
- reboot_task = RebootTask(queue_entry.get_host())
+ # don't pass the queue entry to the RebootTask. if the reboot
+ # fails, the job doesn't care -- it's over.
+ reboot_task = RebootTask(host=queue_entry.get_host())
self.agent.dispatcher.add_agent(Agent([reboot_task]))
@@ -1473,8 +1496,10 @@
class RebootTask(AgentTask):
- def __init__(self, host):
- global _autoserv_path
+ def __init__(self, host=None, queue_entry=None):
+ assert bool(host) ^ bool(queue_entry)
+ if queue_entry:
+ host = queue_entry.get_host()
# Current implementation of autoserv requires control file
# to be passed on reboot action request. TODO: remove when no
@@ -1482,9 +1507,10 @@
self.create_temp_resultsdir('.reboot')
self.cmd = [_autoserv_path, '-b', '-m', host.hostname,
'-r', self.temp_results_dir, '/dev/null']
+ self.queue_entry = queue_entry
self.host = host
- super(RebootTask, self).__init__(self.cmd,
- failure_tasks=[RepairTask(host)])
+ repair_task = RepairTask(host, fail_queue_entry=queue_entry)
+ super(RebootTask, self).__init__(self.cmd, failure_tasks=[repair_task])
def prolog(self):
@@ -1494,9 +1520,11 @@
def epilog(self):
super(RebootTask, self).epilog()
- self.host.set_status('Ready')
if self.success:
+ self.host.set_status('Ready')
self.host.update_field('dirty', 0)
+ elif self.queue_entry:
+ self.queue_entry.requeue()
class AbortTask(AgentTask):
@@ -2037,7 +2065,7 @@
host = self.get_host()
if host:
- reboot_task = RebootTask(host)
+ reboot_task = RebootTask(host=host)
verify_task = VerifyTask(host=host)
# just to make sure this host does not get taken away
host.set_status('Rebooting')
@@ -2207,7 +2235,7 @@
tasks = []
if do_reboot:
- tasks.append(RebootTask(queue_entry.get_host()))
+ tasks.append(RebootTask(queue_entry=queue_entry))
tasks.append(verify_task_class(queue_entry=queue_entry))
return tasks
diff --git a/scheduler/monitor_db_unittest.py b/scheduler/monitor_db_unittest.py
index 757f104..d62a1bc 100644
--- a/scheduler/monitor_db_unittest.py
+++ b/scheduler/monitor_db_unittest.py
@@ -376,6 +376,13 @@
self._check_for_extra_schedulings()
+ def test_no_ready_hosts(self):
+ self._create_job(hosts=[1])
+ self._do_query('UPDATE hosts SET status="Repair Failed"')
+ self._dispatcher._schedule_new_jobs()
+ self._check_for_extra_schedulings()
+
+
class DispatcherThrottlingTest(BaseSchedulerTest):
"""
Test that the dispatcher throttles:
@@ -1050,6 +1057,45 @@
self.god.check_playback()
+ def _test_reboot_task_helper(self, success, use_queue_entry=False):
+ if use_queue_entry:
+ self.queue_entry.get_host.expect_call().and_return(self.host)
+ self.host.set_status.expect_call('Rebooting')
+ if success:
+ self.setup_run_monitor(0)
+ self.host.set_status.expect_call('Ready')
+ self.host.update_field.expect_call('dirty', 0)
+ else:
+ self.setup_run_monitor(1)
+ if use_queue_entry:
+ self.queue_entry.requeue.expect_call()
+
+ if use_queue_entry:
+ task = monitor_db.RebootTask(queue_entry=self.queue_entry)
+ else:
+ task = monitor_db.RebootTask(host=self.host)
+ self.assertEquals(len(task.failure_tasks), 1)
+ repair_task = task.failure_tasks[0]
+ self.assert_(isinstance(repair_task, monitor_db.RepairTask))
+ if use_queue_entry:
+ self.assertEquals(repair_task.fail_queue_entry, self.queue_entry)
+
+ self.run_task(task, success)
+
+ self.god.check_playback()
+ self.assert_(set(task.monitor.cmd) >=
+ set(['autoserv', '-b', '-m', self.HOSTNAME,
+ '-r', self.TEMP_DIR, '/dev/null']))
+
+ def test_reboot_task(self):
+ self._test_reboot_task_helper(True)
+ self._test_reboot_task_helper(False)
+
+
+ def test_reboot_task_with_queue_entry(self):
+ self._test_reboot_task_helper(False, True)
+
+
class JobTest(BaseSchedulerTest):
def _test_run_helper(self, expect_agent=True):
job = monitor_db.Job.fetch('id = 1').next()