Verify repair fixes for scheduler

-change Agent and AgentTask to support a clear_queue_on_failure constructor para
m to AgentTasks.  If True (default), behavior is as before.  If False, the rest 
of the queue will not be clear on task failure - failure tasks will be inserted 
before the rest of the tasks.
-successful RepairTask sets host status to 'Repair Succeeded', not 'Ready', beca
use succesful repair is no guarantee the host is actually OK.  We depend on a Re
verifyTask to follow the RepairTask and verify the machine is actually ready.
-added ReverifyTask and modified VerifyTask to support flowchart as posted at ht
tp://test.kernel.org/autotest/SchedulerSpecification.
-added SchedulerReverifyTask, and refactored all the verify stuff (including add
ing a SchedulerVerifyMixin) to cleanly support both synch and async verify and r
everify.  This includes a fair bit of use of template methods.
-moved some logic from VerifyTask and VerifySynchronousTask out into HostQueueEn
try and Job, including figuring out the verify results dir and handling host ver
ify/repair failure (requeue a meta-host, fail a non-meta-host, stop other entrie
s for a synch job). This is cleaner with the increased complexity of verify/repa
ir.
-modified results moving code to just overwrite existing files/directories.  It 
prints a warning when it does this, but doesn't crash anymore.  Phew.
-modified machines file logic to only write a .machines file for a multi-machine
 async job, and write it on-the-fly as each host runs.
-made meta-host entries unblock hosts when releasing them - releasing only happe
ns when a host fails verify + repair, and in this case, we want to allow assigni
ng back to that host if it should get repaired later
-added DBObject.fetch() and DBObject.delete() to support host unblocking.  Had t
o change how DBObject gets the table name, so that DBObject.fetch(), which must 
be a class method, could access the table name.
-added timestamp to queue.log logging
-global statements are only necessary when writing globals

Signed-off-by: Steve Howard <showard@google.com>



git-svn-id: http://test.kernel.org/svn/autotest/trunk@1281 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/scheduler/monitor_db b/scheduler/monitor_db
index f89c846..b2db1a1 100755
--- a/scheduler/monitor_db
+++ b/scheduler/monitor_db
@@ -6,7 +6,7 @@
 __author__ = "Paul Turner <pjt@google.com>"
 
 import os, sys, tempfile, shutil, MySQLdb, time, traceback, subprocess, Queue
-import optparse, signal, smtplib, socket
+import optparse, signal, smtplib, socket, datetime, stat
 from common import global_config
 
 RESULTS_DIR = '.'
@@ -125,6 +125,15 @@
 	return hosts
 
 
+def remove_file_or_dir(path):
+	if stat.S_ISDIR(os.stat(path).st_mode):
+		# directory
+		shutil.rmtree(path)
+	else:
+		# file
+		os.remove(path)
+
+
 class DatabaseConn:
 	def __init__(self):
 		self.reconnect_wait = 20
@@ -186,7 +195,6 @@
 
 
 def parse_results(results_dir, flags=""):
-	global _testing_mode
 	if _testing_mode:
 		return
 	parse = os.path.join(AUTOTEST_TKO_DIR, 'parse')
@@ -372,11 +380,9 @@
 		if self.active_task:
 			assert self.active_task.is_done()
 
-			if not self.active_task.success and self.active_task.failure_tasks:
-				self.queue = Queue.Queue(0)
-				for task in self.active_task.failure_tasks:
-					self.add_task(task)
-		
+			if not self.active_task.success:
+				self.on_task_failure()
+
 		self.active_task = None
 		if not self.is_done():
 			self.active_task = self.queue.get_nowait()
@@ -384,6 +390,15 @@
 				self.active_task.start()
 
 
+	def on_task_failure(self):
+		old_queue = self.queue
+		self.queue = Queue.Queue(0)
+		for task in self.active_task.failure_tasks:
+			self.add_task(task)
+		if not self.active_task.clear_queue_on_failure:
+			while not old_queue.empty():
+				self.add_task(old_queue.get_nowait())
+
 	def is_done(self):
 		return self.active_task == None and self.queue.empty()
 
@@ -395,9 +410,18 @@
 
 
 class AgentTask(object):
-	def __init__(self, cmd, failure_tasks = None):
+	def __init__(self, cmd, failure_tasks = [],
+		     clear_queue_on_failure=True):
+		"""\
+		By default, on failure, the Agent's task queue is cleared and
+		replaced with the tasks in failure_tasks.  If
+		clear_queue_on_failure=False, the task queue will not be
+		cleared, and the tasks in failure_tasks will be inserted at the
+		beginning of the queue.
+		"""
 		self.done = False
 		self.failure_tasks = failure_tasks
+		self.clear_queue_on_failure = clear_queue_on_failure
 		self.started = False
 		self.cmd = cmd
 		self.agent = None
@@ -468,10 +492,10 @@
 
 class RepairTask(AgentTask):
 	def __init__(self, host):
-		global _autoserv_path
 		cmd = [_autoserv_path ,'-n', '-R', '-m', host.hostname]
 		self.host = host
-		AgentTask.__init__(self, cmd)
+		AgentTask.__init__(self, cmd, clear_queue_on_failure=False)
+
 
 	def prolog(self):
 		print "repair_task starting"
@@ -480,7 +504,7 @@
 
 	def epilog(self):
 		if self.success:
-			status = 'Ready'
+			status = 'Repair Succeeded'
 		else:
 			status = 'Repair Failed'
 
@@ -495,11 +519,19 @@
 		self.queue_entry = queue_entry
 
 		self.temp_results_dir = tempfile.mkdtemp(suffix='.verify')
-		global _autoserv_path
 		cmd = [_autoserv_path,'-n', '-v','-m',self.host.hostname,
 		       '-r', self.temp_results_dir]
 
-		AgentTask.__init__(self, cmd, failure_tasks = [RepairTask(self.host)])
+		failure_tasks = self.get_failure_tasks()
+
+		AgentTask.__init__(self, cmd, failure_tasks=failure_tasks,
+				   clear_queue_on_failure=False)
+
+
+	def get_failure_tasks(self):
+		'To be overridden'
+		return [RepairTask(self.host),
+			ReverifyTask(self.queue_entry, self.host)]
 
 
 	def prolog(self):
@@ -513,68 +545,115 @@
 		if self.queue_entry and (self.success or
 					 not self.queue_entry.meta_host):
 			self.move_results()
-		else:
-			shutil.rmtree(self.temp_results_dir)
+		shutil.rmtree(self.temp_results_dir)
 
 		if self.success:
-			status = 'Ready'
+			self.on_success()
 		else:
-			status = 'Failed Verify'
-			if self.queue_entry:
-				if self.queue_entry.meta_host:
-					self.host.yield_work()
-				else:
-					self.queue_entry.set_status('Failed')
+			self.on_failure()
 
-		self.host.set_status(status)
+
+	def on_success(self):
+		self.host.set_status('Ready')
+
+
+	def on_failure(self):
+		self.host.set_status('Failed Verify')
+		# don't use queue_entry.requeue() here, because we don't want
+		# a meta-host entry to release its host yet - that should only
+		# happen after reverify fails
+		if self.queue_entry:
+			self.queue_entry.set_status('Queued')
 
 
 	def move_results(self):
 		assert self.queue_entry is not None
-		target_dir = self.queue_entry.results_dir()
-		if self.queue_entry.job.is_synchronous():
-			target_dir = os.path.join(target_dir,
-						 self.queue_entry.host.hostname)
+		target_dir = self.queue_entry.verify_results_dir()
 		if not os.path.exists(target_dir):
 			os.makedirs(target_dir)
 		files = os.listdir(self.temp_results_dir)
 		for filename in files:
-			shutil.move(os.path.join(self.temp_results_dir,
-						 filename),
-				    os.path.join(target_dir, filename))
+			self.force_move(os.path.join(self.temp_results_dir,
+						     filename),
+					os.path.join(target_dir, filename))
 
 
-class VerifySynchronousTask(VerifyTask):
+	@staticmethod
+	def force_move(source, dest):
+		"""\
+		Replacement for shutil.move() that will delete the destination
+		if it exists, even if it's a directory.
+		"""
+		if os.path.exists(dest):
+			print ('Warning: removing existing destination file ' +
+			       dest)
+			remove_file_or_dir(dest)
+		shutil.move(source, dest)
+
+
+class ReverifyTask(VerifyTask):
+	def __init__(self, queue_entry=None, host=None):
+		if queue_entry:
+			VerifyTask.__init__(self, queue_entry=queue_entry)
+		else:
+			VerifyTask.__init__(self, host=host)
+		self.clear_queue_on_failure = True
+
+
+	def get_failure_tasks(self):
+		return []
+
+
+	def prolog(self):
+		VerifyTask.prolog(self)
+		if self.queue_entry:
+			self.queue_entry.clear_results_dir(
+			    self.queue_entry.verify_results_dir())
+
+
+	def on_failure(self):
+		self.host.set_status('Repair Failed')
+		if self.queue_entry:
+			self.queue_entry.handle_host_failure()
+
+
+class VerifySynchronousMixin(object):
+	def on_pending(self):
+		if self.queue_entry.job.num_complete() > 0:
+			# some other entry failed verify, and we've
+			# already been marked as stopped
+			return
+
+		self.queue_entry.set_status('Pending')
+		job = self.queue_entry.job
+		if job.is_ready():
+			agent = job.run(self.queue_entry)
+			self.agent.dispatcher.add_agent(agent)
+
+
+class VerifySynchronousTask(VerifyTask, VerifySynchronousMixin):
 	def __init__(self, queue_entry):
 		VerifyTask.__init__(self, queue_entry = queue_entry)
 
 
-	def epilog(self):
-		VerifyTask.epilog(self)
-		print "verify_synchronous finished: %s/%s" % (self.queue_entry.host.hostname, self.success)
-		if self.success:
-			if self.queue_entry.job.num_complete()==0:
-				self.queue_entry.set_status('Pending')
-	
-				job = self.queue_entry.job
-				if job.is_ready():
-					self.agent.dispatcher.add_agent(job.run(self.queue_entry))
-			else:
-				self.queue_entry.set_status('Stopped') # some other entry failed verify
-		else:
-			if self.queue_entry.meta_host:
-				self.queue_entry.set_status('Queued')
-			else:
-				# VerifyTask.epilog() set this queue entry to
-				# failed, so it won't be set to stopped
-				self.stop_all_entries()
+	def get_failure_tasks(self):
+		return [RepairTask(self.host),
+			ReverifySynchronousTask(self.queue_entry)]
 
 
-	def stop_all_entries(self):
-		job = self.queue_entry.job
-		for child_entry in job.get_host_queue_entries():
-			if not child_entry.complete:
-				child_entry.set_status('Stopped')
+	def on_success(self):
+		VerifyTask.on_success(self)
+		self.on_pending()
+
+
+class ReverifySynchronousTask(ReverifyTask, VerifySynchronousMixin):
+	def __init__(self, queue_entry):
+		ReverifyTask.__init__(self, queue_entry = queue_entry)
+
+
+	def on_success(self):
+		ReverifyTask.on_success(self)
+		self.on_pending()
 
 
 class QueueTask(AgentTask):
@@ -593,15 +672,20 @@
 
 
 	def prolog(self):
+		# write some job timestamps into the job keyval file
+		queued = time.mktime(self.job.created_on.timetuple())
+		started = time.time()
+		self._write_keyval(self.queue_entries[0], "job_queued", queued)
+		self._write_keyval(self.queue_entries[0], "job_started",
+				   started)
 		for queue_entry in self.queue_entries:
 			print "starting queue_task on %s/%s" % (queue_entry.host.hostname, queue_entry.id)
 			queue_entry.set_status('Running')
 			queue_entry.host.set_status('Running')
-			# write some job timestamps into the job keyval file
-			queued = time.mktime(self.job.created_on.timetuple())
-			started = time.time()
-			self._write_keyval(queue_entry, "job_queued", queued)
-			self._write_keyval(queue_entry, "job_started", started)
+		if (not self.job.is_synchronous() and
+		    self.job.num_machines() > 1):
+			assert len(self.queue_entries) == 1
+			self.job.write_to_machines_file(self.queue_entries[0])
 
 
 	def epilog(self):
@@ -610,13 +694,13 @@
 		else:
 			status = 'Failed'
 
+		# write another timestamp into the job keyval file
+		finished = time.time()
+		self._write_keyval(self.queue_entries[0], "job_finished",
+				   finished)
 		for queue_entry in self.queue_entries:
 			queue_entry.set_status(status)
 			queue_entry.host.set_status('Ready')
-			# write another timestamp into the job keyval file	
-			finished = time.time()
-			self._write_keyval(queue_entry, "job_finished",
-					   finished)
 
 		if self.job.is_synchronous() or self.job.num_machines()==1:
 			if self.job.is_finished():
@@ -631,7 +715,6 @@
 class RebootTask(AgentTask):
 	def __init__(self):
 		AgentTask.__init__(self, host)
-		global _autoserv_path
 		self.cmd = "%s -n -b -m %s /dev/null" % (_autoserv_path, host)
 		self.host = host
 
@@ -646,10 +729,10 @@
 
 
 class DBObject(object):
-	def __init__(self, table, fields, id=None, row=None, new_record=False):
-		assert (bool(id) != bool(row)) and table and fields
+	def __init__(self, fields, id=None, row=None, new_record=False):
+		assert (bool(id) != bool(row)) and fields
 
-		self.__table = table
+		self.__table = self._get_table()
 		self.__fields = fields
 
 		self.__new_record = new_record
@@ -662,7 +745,9 @@
 							(self.__table, id)
 			row = rows[0]
 
-		assert len(row)==len(fields), "table = %s, row = %s/%d, fields = %s/%d" % (table, row, len(row), fields, len(fields))
+		assert len(row)==len(fields), (
+		    "table = %s, row = %s/%d, fields = %s/%d" % (
+		    self.__table, row, len(row), fields, len(fields)))
 
 		self.__valid_fields = {}
 		for i,value in enumerate(row):
@@ -671,6 +756,12 @@
 
 		del self.__valid_fields['id']
 
+
+	@classmethod
+	def _get_table(cls):
+		raise NotImplementedError('Subclasses must override this')
+
+
 	def count(self, where, table = None):
 		if not table:
 			table = self.__table
@@ -713,19 +804,40 @@
 			_db.execute(query)
 
 
+	def delete(self):
+		query = 'DELETE FROM %s WHERE id=%%s' % self.__table
+		_db.execute(query, (self.id,))
+
+
+	@classmethod
+	def fetch(cls, where):
+		rows = _db.execute(
+		    'SELECT * FROM %s WHERE %s' % (cls._get_table(), where))
+		for row in rows:
+			yield cls(row=row)
+
 
 class IneligibleHostQueue(DBObject):
 	def __init__(self, id=None, row=None, new_record=None):
 		fields = ['id', 'job_id', 'host_id']
-		DBObject.__init__(self, 'ineligible_host_queues', fields,
-					id=id, row=row, new_record=new_record)
+		DBObject.__init__(self, fields, id=id, row=row,
+				  new_record=new_record)
+
+
+	@classmethod
+	def _get_table(cls):
+		return 'ineligible_host_queues'
 
 
 class Host(DBObject):
 	def __init__(self, id=None, row=None):
 		fields =  ['id', 'hostname', 'locked', 'synch_id','status']
-		DBObject.__init__(self, 'hosts',fields, id=id, row=row)
-		
+		DBObject.__init__(self, fields, id=id, row=row)
+
+
+	@classmethod
+	def _get_table(cls):
+		return 'hosts'
 
 
 	def current_task(self):
@@ -782,8 +894,7 @@
 		assert id or row
 		fields = ['id', 'job_id', 'host_id', 'priority', 'status',
 			  'meta_host', 'active', 'complete']
-		DBObject.__init__(self, 'host_queue_entries', fields, id=id,
-									row=row)
+		DBObject.__init__(self, fields, id=id, row=row)
 
 		self.job = Job(self.job_id)
 
@@ -796,43 +907,67 @@
 						   'queue.log.' + str(self.id))
 
 
+	@classmethod
+	def _get_table(cls):
+		return 'host_queue_entries'
+
+
 	def set_host(self, host):
 		if host:
 			self.queue_log_record('Assigning host ' + host.hostname)
 			self.update_field('host_id', host.id)
 			self.update_field('active', True)
+			self.block_host(host.id)
 		else:
 			self.queue_log_record('Releasing host')
+			self.unblock_host(self.host.id)
 			self.update_field('host_id', None)
 
 		self.host = host
 
 
 	def get_host(self):
-		if not self.host:
-			if self.host_id:
-				self.host  = Host(self.host_id)
-		if self.host:
-			return self.host
-		else:
-			return None
+		return self.host
 
 
 	def queue_log_record(self, log_line):
+		now = str(datetime.datetime.now())
 		queue_log = open(self.queue_log_path, 'a', 0)
-		queue_log.write(log_line + '\n')
+		queue_log.write(now + ' ' + log_line + '\n')
 		queue_log.close()
 
 
+	def block_host(self, host_id):
+		print "creating block %s/%s" % (self.job.id, host_id)
+		row = [0, self.job.id, host_id]
+		block = IneligibleHostQueue(row=row, new_record=True)
+		block.save()
+
+
+	def unblock_host(self, host_id):
+		print "removing block %s/%s" % (self.job.id, host_id)
+		blocks = list(IneligibleHostQueue.fetch(
+		    'job_id=%d and host_id=%d' % (self.job.id, host_id)))
+		assert len(blocks) == 1
+		blocks[0].delete()
+
+
 	def results_dir(self):
-		if self.job.num_machines()==1 or self.job.is_synchronous():
-			results_dir = self.job.job_dir
+		if self.job.is_synchronous() or self.job.num_machines() == 1:
+			return self.job.job_dir
 		else:
 			assert self.host
-			results_dir = '%s/%s' % (self.job.job_dir,
-						 self.host.hostname)
+			return os.path.join(self.job.job_dir,
+					    self.host.hostname)
 
-		return results_dir
+
+	def verify_results_dir(self):
+		if self.job.is_synchronous() or self.job.num_machines() > 1:
+			assert self.host
+			return os.path.join(self.job.job_dir,
+					    self.host.hostname)
+		else:
+			return self.job.job_dir
 
 
 	def set_status(self, status):
@@ -858,40 +993,62 @@
 	def run(self,assigned_host=None):
 		if self.meta_host:
 			assert assigned_host
+			# ensure results dir exists for the queue log
 			self.job.create_results_dir()
 			self.set_host(assigned_host)
-			print "creating block %s/%s" % (self.job.id,
-							self.host.id)
 
-			row = [0, self.job.id, self.host.id]
-			block = IneligibleHostQueue(row=row, new_record=True)
-			block.save()
-		
 		print "%s/%s scheduled on %s, status=%s" % (self.job.name,
 				self.meta_host, self.host.hostname, self.status)
 
 		return self.job.run(queue_entry=self)
-	
+
 	def requeue(self):
 		self.set_status('Queued')
-		
+
 		if self.meta_host:
 			self.set_host(None)
 
 
-	def clear_results_dir(self):
-		if os.path.exists(self.results_dir()):
-			shutil.rmtree(self.results_dir())
+	def handle_host_failure(self):
+		"""\
+		Called when this queue entry's host has failed verification and
+		repair.
+		"""
+		if self.meta_host:
+			self.requeue()
+		else:
+			self.set_status('Failed')
+			if self.job.is_synchronous():
+				self.job.stop_all_entries()
+
+
+	def clear_results_dir(self, results_dir=None):
+		results_dir = results_dir or self.results_dir()
+		if not os.path.exists(results_dir):
+			return
+		for filename in os.listdir(results_dir):
+			if 'queue.log' in filename:
+				continue
+			path = os.path.join(results_dir, filename)
+			remove_file_or_dir(path)
 
 
 class Job(DBObject):
 	def __init__(self, id=None, row=None):
 		assert id or row
-		DBObject.__init__(self,'jobs',['id','owner','name','priority','control_file',
-                                  'control_type','created_on', 'synch_type', 'synch_count',
-                                  'synchronizing' ], id=id, row=row)
+		DBObject.__init__(self,
+				  ['id','owner','name','priority',
+				   'control_file','control_type','created_on',
+				   'synch_type', 'synch_count','synchronizing'],
+				  id=id, row=row)
 
-		self.job_dir = os.path.join(RESULTS_DIR, "%s-%s" % (self.id, self.owner))
+		self.job_dir = os.path.join(RESULTS_DIR, "%s-%s" % (self.id,
+								    self.owner))
+
+
+	@classmethod
+	def _get_table(cls):
+		return 'jobs'
 
 
 	def is_server_job(self):
@@ -977,15 +1134,19 @@
 		self.set_status('Queued', update_queues = False)
 
 
-	def write_machines_file(self):
-		if self.num_machines()>1:
-			print "writing machines file"
-			mf = open("%s/.machines" % self.job_dir, 'w')
-			for queue_entry in self.get_host_queue_entries():
-				if queue_entry.get_host():
-					mf.write("%s\n" % \
-						queue_entry.get_host().hostname)
-			mf.close()
+	def stop_all_entries(self):
+		for child_entry in self.get_host_queue_entries():
+			if not child_entry.complete:
+				child_entry.set_status('Stopped')
+
+
+	def write_to_machines_file(self, queue_entry):
+		hostname = queue_entry.get_host().hostname
+		print "writing %s to job %s machines file" % (hostname, self.id)
+		file_path = os.path.join(self.job_dir, '.machines')
+		mf = open(file_path, 'a')
+		mf.write("%s\n" % queue_entry.get_host().hostname)
+		mf.close()
 
 
 	def create_results_dir(self, queue_entry=None):
@@ -1017,17 +1178,13 @@
 		ctrl.flush()
 
 		if self.is_synchronous():
-			if self.num_machines() > 1:
-				self.write_machines_file()
-			hosts = self.get_host_queue_entries()
-			hostnames = ','.join([i.host.hostname for i in hosts])
 			queue_entries = self.get_host_queue_entries()
 		else:
 			assert queue_entry
-			hostnames = queue_entry.host.hostname
 			queue_entries = [queue_entry]
+		hostnames = ','.join([entry.get_host().hostname
+				      for entry in queue_entries])
 
-		global _autoserv_path
 		params = [_autoserv_path, '-n', '-r', results_dir,
 			'-b', '-u', self.owner, '-l', self.name,
 			'-m', hostnames, ctrl.name]
@@ -1038,8 +1195,10 @@
 		tasks = []
 		if not self.is_synchronous():
 			tasks.append(VerifyTask(queue_entry))
-			
-		tasks.append(QueueTask(job = self, queue_entries = queue_entries, cmd = params))
+
+		tasks.append(QueueTask(job = self,
+				       queue_entries = queue_entries,
+				       cmd = params))
 
 		agent = Agent(tasks)