Verify repair fixes for scheduler
-change Agent and AgentTask to support a clear_queue_on_failure constructor para
m to AgentTasks. If True (default), behavior is as before. If False, the rest
of the queue will not be clear on task failure - failure tasks will be inserted
before the rest of the tasks.
-successful RepairTask sets host status to 'Repair Succeeded', not 'Ready', beca
use succesful repair is no guarantee the host is actually OK. We depend on a Re
verifyTask to follow the RepairTask and verify the machine is actually ready.
-added ReverifyTask and modified VerifyTask to support flowchart as posted at ht
tp://test.kernel.org/autotest/SchedulerSpecification.
-added SchedulerReverifyTask, and refactored all the verify stuff (including add
ing a SchedulerVerifyMixin) to cleanly support both synch and async verify and r
everify. This includes a fair bit of use of template methods.
-moved some logic from VerifyTask and VerifySynchronousTask out into HostQueueEn
try and Job, including figuring out the verify results dir and handling host ver
ify/repair failure (requeue a meta-host, fail a non-meta-host, stop other entrie
s for a synch job). This is cleaner with the increased complexity of verify/repa
ir.
-modified results moving code to just overwrite existing files/directories. It
prints a warning when it does this, but doesn't crash anymore. Phew.
-modified machines file logic to only write a .machines file for a multi-machine
async job, and write it on-the-fly as each host runs.
-made meta-host entries unblock hosts when releasing them - releasing only happe
ns when a host fails verify + repair, and in this case, we want to allow assigni
ng back to that host if it should get repaired later
-added DBObject.fetch() and DBObject.delete() to support host unblocking. Had t
o change how DBObject gets the table name, so that DBObject.fetch(), which must
be a class method, could access the table name.
-added timestamp to queue.log logging
-global statements are only necessary when writing globals
Signed-off-by: Steve Howard <showard@google.com>
git-svn-id: http://test.kernel.org/svn/autotest/trunk@1281 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/scheduler/monitor_db b/scheduler/monitor_db
index f89c846..b2db1a1 100755
--- a/scheduler/monitor_db
+++ b/scheduler/monitor_db
@@ -6,7 +6,7 @@
__author__ = "Paul Turner <pjt@google.com>"
import os, sys, tempfile, shutil, MySQLdb, time, traceback, subprocess, Queue
-import optparse, signal, smtplib, socket
+import optparse, signal, smtplib, socket, datetime, stat
from common import global_config
RESULTS_DIR = '.'
@@ -125,6 +125,15 @@
return hosts
+def remove_file_or_dir(path):
+ if stat.S_ISDIR(os.stat(path).st_mode):
+ # directory
+ shutil.rmtree(path)
+ else:
+ # file
+ os.remove(path)
+
+
class DatabaseConn:
def __init__(self):
self.reconnect_wait = 20
@@ -186,7 +195,6 @@
def parse_results(results_dir, flags=""):
- global _testing_mode
if _testing_mode:
return
parse = os.path.join(AUTOTEST_TKO_DIR, 'parse')
@@ -372,11 +380,9 @@
if self.active_task:
assert self.active_task.is_done()
- if not self.active_task.success and self.active_task.failure_tasks:
- self.queue = Queue.Queue(0)
- for task in self.active_task.failure_tasks:
- self.add_task(task)
-
+ if not self.active_task.success:
+ self.on_task_failure()
+
self.active_task = None
if not self.is_done():
self.active_task = self.queue.get_nowait()
@@ -384,6 +390,15 @@
self.active_task.start()
+ def on_task_failure(self):
+ old_queue = self.queue
+ self.queue = Queue.Queue(0)
+ for task in self.active_task.failure_tasks:
+ self.add_task(task)
+ if not self.active_task.clear_queue_on_failure:
+ while not old_queue.empty():
+ self.add_task(old_queue.get_nowait())
+
def is_done(self):
return self.active_task == None and self.queue.empty()
@@ -395,9 +410,18 @@
class AgentTask(object):
- def __init__(self, cmd, failure_tasks = None):
+ def __init__(self, cmd, failure_tasks = [],
+ clear_queue_on_failure=True):
+ """\
+ By default, on failure, the Agent's task queue is cleared and
+ replaced with the tasks in failure_tasks. If
+ clear_queue_on_failure=False, the task queue will not be
+ cleared, and the tasks in failure_tasks will be inserted at the
+ beginning of the queue.
+ """
self.done = False
self.failure_tasks = failure_tasks
+ self.clear_queue_on_failure = clear_queue_on_failure
self.started = False
self.cmd = cmd
self.agent = None
@@ -468,10 +492,10 @@
class RepairTask(AgentTask):
def __init__(self, host):
- global _autoserv_path
cmd = [_autoserv_path ,'-n', '-R', '-m', host.hostname]
self.host = host
- AgentTask.__init__(self, cmd)
+ AgentTask.__init__(self, cmd, clear_queue_on_failure=False)
+
def prolog(self):
print "repair_task starting"
@@ -480,7 +504,7 @@
def epilog(self):
if self.success:
- status = 'Ready'
+ status = 'Repair Succeeded'
else:
status = 'Repair Failed'
@@ -495,11 +519,19 @@
self.queue_entry = queue_entry
self.temp_results_dir = tempfile.mkdtemp(suffix='.verify')
- global _autoserv_path
cmd = [_autoserv_path,'-n', '-v','-m',self.host.hostname,
'-r', self.temp_results_dir]
- AgentTask.__init__(self, cmd, failure_tasks = [RepairTask(self.host)])
+ failure_tasks = self.get_failure_tasks()
+
+ AgentTask.__init__(self, cmd, failure_tasks=failure_tasks,
+ clear_queue_on_failure=False)
+
+
+ def get_failure_tasks(self):
+ 'To be overridden'
+ return [RepairTask(self.host),
+ ReverifyTask(self.queue_entry, self.host)]
def prolog(self):
@@ -513,68 +545,115 @@
if self.queue_entry and (self.success or
not self.queue_entry.meta_host):
self.move_results()
- else:
- shutil.rmtree(self.temp_results_dir)
+ shutil.rmtree(self.temp_results_dir)
if self.success:
- status = 'Ready'
+ self.on_success()
else:
- status = 'Failed Verify'
- if self.queue_entry:
- if self.queue_entry.meta_host:
- self.host.yield_work()
- else:
- self.queue_entry.set_status('Failed')
+ self.on_failure()
- self.host.set_status(status)
+
+ def on_success(self):
+ self.host.set_status('Ready')
+
+
+ def on_failure(self):
+ self.host.set_status('Failed Verify')
+ # don't use queue_entry.requeue() here, because we don't want
+ # a meta-host entry to release its host yet - that should only
+ # happen after reverify fails
+ if self.queue_entry:
+ self.queue_entry.set_status('Queued')
def move_results(self):
assert self.queue_entry is not None
- target_dir = self.queue_entry.results_dir()
- if self.queue_entry.job.is_synchronous():
- target_dir = os.path.join(target_dir,
- self.queue_entry.host.hostname)
+ target_dir = self.queue_entry.verify_results_dir()
if not os.path.exists(target_dir):
os.makedirs(target_dir)
files = os.listdir(self.temp_results_dir)
for filename in files:
- shutil.move(os.path.join(self.temp_results_dir,
- filename),
- os.path.join(target_dir, filename))
+ self.force_move(os.path.join(self.temp_results_dir,
+ filename),
+ os.path.join(target_dir, filename))
-class VerifySynchronousTask(VerifyTask):
+ @staticmethod
+ def force_move(source, dest):
+ """\
+ Replacement for shutil.move() that will delete the destination
+ if it exists, even if it's a directory.
+ """
+ if os.path.exists(dest):
+ print ('Warning: removing existing destination file ' +
+ dest)
+ remove_file_or_dir(dest)
+ shutil.move(source, dest)
+
+
+class ReverifyTask(VerifyTask):
+ def __init__(self, queue_entry=None, host=None):
+ if queue_entry:
+ VerifyTask.__init__(self, queue_entry=queue_entry)
+ else:
+ VerifyTask.__init__(self, host=host)
+ self.clear_queue_on_failure = True
+
+
+ def get_failure_tasks(self):
+ return []
+
+
+ def prolog(self):
+ VerifyTask.prolog(self)
+ if self.queue_entry:
+ self.queue_entry.clear_results_dir(
+ self.queue_entry.verify_results_dir())
+
+
+ def on_failure(self):
+ self.host.set_status('Repair Failed')
+ if self.queue_entry:
+ self.queue_entry.handle_host_failure()
+
+
+class VerifySynchronousMixin(object):
+ def on_pending(self):
+ if self.queue_entry.job.num_complete() > 0:
+ # some other entry failed verify, and we've
+ # already been marked as stopped
+ return
+
+ self.queue_entry.set_status('Pending')
+ job = self.queue_entry.job
+ if job.is_ready():
+ agent = job.run(self.queue_entry)
+ self.agent.dispatcher.add_agent(agent)
+
+
+class VerifySynchronousTask(VerifyTask, VerifySynchronousMixin):
def __init__(self, queue_entry):
VerifyTask.__init__(self, queue_entry = queue_entry)
- def epilog(self):
- VerifyTask.epilog(self)
- print "verify_synchronous finished: %s/%s" % (self.queue_entry.host.hostname, self.success)
- if self.success:
- if self.queue_entry.job.num_complete()==0:
- self.queue_entry.set_status('Pending')
-
- job = self.queue_entry.job
- if job.is_ready():
- self.agent.dispatcher.add_agent(job.run(self.queue_entry))
- else:
- self.queue_entry.set_status('Stopped') # some other entry failed verify
- else:
- if self.queue_entry.meta_host:
- self.queue_entry.set_status('Queued')
- else:
- # VerifyTask.epilog() set this queue entry to
- # failed, so it won't be set to stopped
- self.stop_all_entries()
+ def get_failure_tasks(self):
+ return [RepairTask(self.host),
+ ReverifySynchronousTask(self.queue_entry)]
- def stop_all_entries(self):
- job = self.queue_entry.job
- for child_entry in job.get_host_queue_entries():
- if not child_entry.complete:
- child_entry.set_status('Stopped')
+ def on_success(self):
+ VerifyTask.on_success(self)
+ self.on_pending()
+
+
+class ReverifySynchronousTask(ReverifyTask, VerifySynchronousMixin):
+ def __init__(self, queue_entry):
+ ReverifyTask.__init__(self, queue_entry = queue_entry)
+
+
+ def on_success(self):
+ ReverifyTask.on_success(self)
+ self.on_pending()
class QueueTask(AgentTask):
@@ -593,15 +672,20 @@
def prolog(self):
+ # write some job timestamps into the job keyval file
+ queued = time.mktime(self.job.created_on.timetuple())
+ started = time.time()
+ self._write_keyval(self.queue_entries[0], "job_queued", queued)
+ self._write_keyval(self.queue_entries[0], "job_started",
+ started)
for queue_entry in self.queue_entries:
print "starting queue_task on %s/%s" % (queue_entry.host.hostname, queue_entry.id)
queue_entry.set_status('Running')
queue_entry.host.set_status('Running')
- # write some job timestamps into the job keyval file
- queued = time.mktime(self.job.created_on.timetuple())
- started = time.time()
- self._write_keyval(queue_entry, "job_queued", queued)
- self._write_keyval(queue_entry, "job_started", started)
+ if (not self.job.is_synchronous() and
+ self.job.num_machines() > 1):
+ assert len(self.queue_entries) == 1
+ self.job.write_to_machines_file(self.queue_entries[0])
def epilog(self):
@@ -610,13 +694,13 @@
else:
status = 'Failed'
+ # write another timestamp into the job keyval file
+ finished = time.time()
+ self._write_keyval(self.queue_entries[0], "job_finished",
+ finished)
for queue_entry in self.queue_entries:
queue_entry.set_status(status)
queue_entry.host.set_status('Ready')
- # write another timestamp into the job keyval file
- finished = time.time()
- self._write_keyval(queue_entry, "job_finished",
- finished)
if self.job.is_synchronous() or self.job.num_machines()==1:
if self.job.is_finished():
@@ -631,7 +715,6 @@
class RebootTask(AgentTask):
def __init__(self):
AgentTask.__init__(self, host)
- global _autoserv_path
self.cmd = "%s -n -b -m %s /dev/null" % (_autoserv_path, host)
self.host = host
@@ -646,10 +729,10 @@
class DBObject(object):
- def __init__(self, table, fields, id=None, row=None, new_record=False):
- assert (bool(id) != bool(row)) and table and fields
+ def __init__(self, fields, id=None, row=None, new_record=False):
+ assert (bool(id) != bool(row)) and fields
- self.__table = table
+ self.__table = self._get_table()
self.__fields = fields
self.__new_record = new_record
@@ -662,7 +745,9 @@
(self.__table, id)
row = rows[0]
- assert len(row)==len(fields), "table = %s, row = %s/%d, fields = %s/%d" % (table, row, len(row), fields, len(fields))
+ assert len(row)==len(fields), (
+ "table = %s, row = %s/%d, fields = %s/%d" % (
+ self.__table, row, len(row), fields, len(fields)))
self.__valid_fields = {}
for i,value in enumerate(row):
@@ -671,6 +756,12 @@
del self.__valid_fields['id']
+
+ @classmethod
+ def _get_table(cls):
+ raise NotImplementedError('Subclasses must override this')
+
+
def count(self, where, table = None):
if not table:
table = self.__table
@@ -713,19 +804,40 @@
_db.execute(query)
+ def delete(self):
+ query = 'DELETE FROM %s WHERE id=%%s' % self.__table
+ _db.execute(query, (self.id,))
+
+
+ @classmethod
+ def fetch(cls, where):
+ rows = _db.execute(
+ 'SELECT * FROM %s WHERE %s' % (cls._get_table(), where))
+ for row in rows:
+ yield cls(row=row)
+
class IneligibleHostQueue(DBObject):
def __init__(self, id=None, row=None, new_record=None):
fields = ['id', 'job_id', 'host_id']
- DBObject.__init__(self, 'ineligible_host_queues', fields,
- id=id, row=row, new_record=new_record)
+ DBObject.__init__(self, fields, id=id, row=row,
+ new_record=new_record)
+
+
+ @classmethod
+ def _get_table(cls):
+ return 'ineligible_host_queues'
class Host(DBObject):
def __init__(self, id=None, row=None):
fields = ['id', 'hostname', 'locked', 'synch_id','status']
- DBObject.__init__(self, 'hosts',fields, id=id, row=row)
-
+ DBObject.__init__(self, fields, id=id, row=row)
+
+
+ @classmethod
+ def _get_table(cls):
+ return 'hosts'
def current_task(self):
@@ -782,8 +894,7 @@
assert id or row
fields = ['id', 'job_id', 'host_id', 'priority', 'status',
'meta_host', 'active', 'complete']
- DBObject.__init__(self, 'host_queue_entries', fields, id=id,
- row=row)
+ DBObject.__init__(self, fields, id=id, row=row)
self.job = Job(self.job_id)
@@ -796,43 +907,67 @@
'queue.log.' + str(self.id))
+ @classmethod
+ def _get_table(cls):
+ return 'host_queue_entries'
+
+
def set_host(self, host):
if host:
self.queue_log_record('Assigning host ' + host.hostname)
self.update_field('host_id', host.id)
self.update_field('active', True)
+ self.block_host(host.id)
else:
self.queue_log_record('Releasing host')
+ self.unblock_host(self.host.id)
self.update_field('host_id', None)
self.host = host
def get_host(self):
- if not self.host:
- if self.host_id:
- self.host = Host(self.host_id)
- if self.host:
- return self.host
- else:
- return None
+ return self.host
def queue_log_record(self, log_line):
+ now = str(datetime.datetime.now())
queue_log = open(self.queue_log_path, 'a', 0)
- queue_log.write(log_line + '\n')
+ queue_log.write(now + ' ' + log_line + '\n')
queue_log.close()
+ def block_host(self, host_id):
+ print "creating block %s/%s" % (self.job.id, host_id)
+ row = [0, self.job.id, host_id]
+ block = IneligibleHostQueue(row=row, new_record=True)
+ block.save()
+
+
+ def unblock_host(self, host_id):
+ print "removing block %s/%s" % (self.job.id, host_id)
+ blocks = list(IneligibleHostQueue.fetch(
+ 'job_id=%d and host_id=%d' % (self.job.id, host_id)))
+ assert len(blocks) == 1
+ blocks[0].delete()
+
+
def results_dir(self):
- if self.job.num_machines()==1 or self.job.is_synchronous():
- results_dir = self.job.job_dir
+ if self.job.is_synchronous() or self.job.num_machines() == 1:
+ return self.job.job_dir
else:
assert self.host
- results_dir = '%s/%s' % (self.job.job_dir,
- self.host.hostname)
+ return os.path.join(self.job.job_dir,
+ self.host.hostname)
- return results_dir
+
+ def verify_results_dir(self):
+ if self.job.is_synchronous() or self.job.num_machines() > 1:
+ assert self.host
+ return os.path.join(self.job.job_dir,
+ self.host.hostname)
+ else:
+ return self.job.job_dir
def set_status(self, status):
@@ -858,40 +993,62 @@
def run(self,assigned_host=None):
if self.meta_host:
assert assigned_host
+ # ensure results dir exists for the queue log
self.job.create_results_dir()
self.set_host(assigned_host)
- print "creating block %s/%s" % (self.job.id,
- self.host.id)
- row = [0, self.job.id, self.host.id]
- block = IneligibleHostQueue(row=row, new_record=True)
- block.save()
-
print "%s/%s scheduled on %s, status=%s" % (self.job.name,
self.meta_host, self.host.hostname, self.status)
return self.job.run(queue_entry=self)
-
+
def requeue(self):
self.set_status('Queued')
-
+
if self.meta_host:
self.set_host(None)
- def clear_results_dir(self):
- if os.path.exists(self.results_dir()):
- shutil.rmtree(self.results_dir())
+ def handle_host_failure(self):
+ """\
+ Called when this queue entry's host has failed verification and
+ repair.
+ """
+ if self.meta_host:
+ self.requeue()
+ else:
+ self.set_status('Failed')
+ if self.job.is_synchronous():
+ self.job.stop_all_entries()
+
+
+ def clear_results_dir(self, results_dir=None):
+ results_dir = results_dir or self.results_dir()
+ if not os.path.exists(results_dir):
+ return
+ for filename in os.listdir(results_dir):
+ if 'queue.log' in filename:
+ continue
+ path = os.path.join(results_dir, filename)
+ remove_file_or_dir(path)
class Job(DBObject):
def __init__(self, id=None, row=None):
assert id or row
- DBObject.__init__(self,'jobs',['id','owner','name','priority','control_file',
- 'control_type','created_on', 'synch_type', 'synch_count',
- 'synchronizing' ], id=id, row=row)
+ DBObject.__init__(self,
+ ['id','owner','name','priority',
+ 'control_file','control_type','created_on',
+ 'synch_type', 'synch_count','synchronizing'],
+ id=id, row=row)
- self.job_dir = os.path.join(RESULTS_DIR, "%s-%s" % (self.id, self.owner))
+ self.job_dir = os.path.join(RESULTS_DIR, "%s-%s" % (self.id,
+ self.owner))
+
+
+ @classmethod
+ def _get_table(cls):
+ return 'jobs'
def is_server_job(self):
@@ -977,15 +1134,19 @@
self.set_status('Queued', update_queues = False)
- def write_machines_file(self):
- if self.num_machines()>1:
- print "writing machines file"
- mf = open("%s/.machines" % self.job_dir, 'w')
- for queue_entry in self.get_host_queue_entries():
- if queue_entry.get_host():
- mf.write("%s\n" % \
- queue_entry.get_host().hostname)
- mf.close()
+ def stop_all_entries(self):
+ for child_entry in self.get_host_queue_entries():
+ if not child_entry.complete:
+ child_entry.set_status('Stopped')
+
+
+ def write_to_machines_file(self, queue_entry):
+ hostname = queue_entry.get_host().hostname
+ print "writing %s to job %s machines file" % (hostname, self.id)
+ file_path = os.path.join(self.job_dir, '.machines')
+ mf = open(file_path, 'a')
+ mf.write("%s\n" % queue_entry.get_host().hostname)
+ mf.close()
def create_results_dir(self, queue_entry=None):
@@ -1017,17 +1178,13 @@
ctrl.flush()
if self.is_synchronous():
- if self.num_machines() > 1:
- self.write_machines_file()
- hosts = self.get_host_queue_entries()
- hostnames = ','.join([i.host.hostname for i in hosts])
queue_entries = self.get_host_queue_entries()
else:
assert queue_entry
- hostnames = queue_entry.host.hostname
queue_entries = [queue_entry]
+ hostnames = ','.join([entry.get_host().hostname
+ for entry in queue_entries])
- global _autoserv_path
params = [_autoserv_path, '-n', '-r', results_dir,
'-b', '-u', self.owner, '-l', self.name,
'-m', hostnames, ctrl.name]
@@ -1038,8 +1195,10 @@
tasks = []
if not self.is_synchronous():
tasks.append(VerifyTask(queue_entry))
-
- tasks.append(QueueTask(job = self, queue_entries = queue_entries, cmd = params))
+
+ tasks.append(QueueTask(job = self,
+ queue_entries = queue_entries,
+ cmd = params))
agent = Agent(tasks)