-when no pidfile is found by PidfileRunMonitor, just wait, and after a timeout, send email and act as if process failed
-when requeuing a job during recovery, don't just delete old results, instead move them to a temp dir and print it
-always reverify "Running" hosts (not just in host recover mode)
-reorganized recovery code a bit
-remove unnecessary check for queue.log files in clear_results_dir (was leftover from when we had ReverifyTask)

Signed-off-by: Steve Howard <showard@google.com>



git-svn-id: http://test.kernel.org/svn/autotest/trunk@1362 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/scheduler/monitor_db b/scheduler/monitor_db
index 0f300aa..f1e340c 100755
--- a/scheduler/monitor_db
+++ b/scheduler/monitor_db
@@ -23,6 +23,8 @@
         sys.path.insert(0, AUTOTEST_SERVER_DIR)
 
 AUTOSERV_PID_FILE = '.autoserv_execute'
+# how long to wait for autoserv to write a pidfile
+PIDFILE_TIMEOUT = 5 * 60 # 5 min
 
 _db = None
 _shutdown = False
@@ -265,6 +267,8 @@
 
 
 class Dispatcher:
+	autoserv_procs_cache = None
+
 	def __init__(self):
 		self._agents = []
 
@@ -278,6 +282,7 @@
 
 
 	def tick(self):
+		Dispatcher.autoserv_procs_cache = None
 		self._find_aborting()
 		self._find_more_work()
 		self._handle_agents()
@@ -300,22 +305,34 @@
 		self._agents.remove(agent)
 
 
-	def find_orphaned_autoservs(self):
+	@classmethod
+	def find_autoservs(cls, orphans_only=False):
 		"""\
 		Returns a dict mapping pids to command lines for root autoserv
-		processes that have been orphaned.
+		processes.  If orphans_only=True, return only processes that
+		have been orphaned (i.e. parent pid = 1).
 		"""
+		if cls.autoserv_procs_cache is not None:
+			return cls.autoserv_procs_cache
+
 		proc = subprocess.Popen(
-		    ['/bin/ps', 'x', '-o', 'pid,ppid,comm,args'],
+		    ['/bin/ps', 'x', '-o', 'pid,pgid,ppid,comm,args'],
 		    stdout=subprocess.PIPE)
 		# split each line into the four columns output by ps
-		procs = [line.split(None, 3) for line in
+		procs = [line.split(None, 4) for line in
 			 proc.communicate()[0].splitlines()]
-		autoserv_procs = [(int(proc[0]), proc[3]) # pid, args
-				  for proc in procs
-				  if proc[2] == 'autoserv' # comm
-				  and proc[1] == '1'] # ppid
-		return dict(autoserv_procs)
+		autoserv_procs = {}
+		for proc in procs:
+			# check ppid == 1 for orphans
+			if orphans_only and proc[2] != 1:
+				continue
+			# only root autoserv processes have pgid == pid
+			if (proc[3] == 'autoserv' and   # comm
+			    proc[1] == proc[0]):        # pgid == pid
+				# map pid to args
+				autoserv_procs[int(proc[0])] = proc[4]
+		cls.autoserv_procs_cache = autoserv_procs
+		return autoserv_procs
 
 
 	def recover_queue_entry(self, queue_entry, run_monitor):
@@ -335,7 +352,7 @@
 
 
 	def _recover_processes(self):
-		orphans = self.find_orphaned_autoservs()
+		orphans = self.find_autoservs(orphans_only=True)
 
 		# first, recover running queue entries
 		rows = _db.execute("""SELECT * FROM host_queue_entries
@@ -375,7 +392,7 @@
 		queue_entries = [HostQueueEntry(row=i) for i in rows]
 		for queue_entry in queue_entries + requeue_entries:
 			print 'Requeuing running QE %d' % queue_entry.id
-			queue_entry.clear_results_dir()
+			queue_entry.clear_results_dir(dont_delete_files=True)
 			queue_entry.requeue()
 
 
@@ -385,10 +402,12 @@
 			kill_autoserv(pid)
 
 		# recover aborting tasks
+		rebooting_host_ids = set()
 		rows = _db.execute("""SELECT * FROM host_queue_entries
 				   WHERE status='Abort' or status='Aborting'""")
 		queue_entries = [HostQueueEntry(row=i) for i in rows]
 		for queue_entry in queue_entries:
+			print 'Recovering aborting QE %d' % queue_entry.id
 			queue_host = queue_entry.get_host()
 			reboot_task = RebootTask(queue_host)
 			verify_task = VerifyTask(host = queue_host)
@@ -398,37 +417,46 @@
 			queue_entry.set_status('Aborted')
 			# Secure the host from being picked up
 			queue_host.set_status('Rebooting')
+			rebooting_host_ids.add(queue_host.id)
 
 		# reverify hosts that were in the middle of verify, repair or
 		# reboot
-		rows = _db.execute("""SELECT * FROM hosts
-				      WHERE locked = 0 AND
-				      (status = 'Repairing'
-				       OR status = 'Verifying'
-				       OR status = 'Rebooting')""")
+		self._reverify_hosts_where("""(status = 'Repairing' OR
+		                               status = 'Verifying' OR
+					       status = 'Rebooting')""",
+					   exclude_ids=rebooting_host_ids)
+
+		# finally, recover "Running" hosts with no active queue entries,
+		# although this should never happen
+		message = ('Recovering running host %s - this probably '
+			   'indicates a scheduler bug')
+		self._reverify_hosts_where("""status = 'Running' AND
+		                              id NOT IN (SELECT host_id
+					                 FROM host_queue_entries
+							 WHERE active)""",
+					   print_message=message)
+
+
+	def _reverify_hosts_where(self, where,
+				  print_message='Reverifying host %s',
+				  exclude_ids=set()):
+		rows = _db.execute('SELECT * FROM hosts WHERE locked = 0 AND ' +
+				   where)
 		hosts = [Host(row=i) for i in rows]
 		for host in hosts:
-			self.add_agent(Agent(tasks=[VerifyTask(host=host)]))
+			if host.id in exclude_ids:
+				continue
+			if print_message is not None:
+				print print_message % host.hostname
+			verify_task = VerifyTask(host = host)
+			self.add_agent(Agent(tasks = [verify_task]))
 
 
 	def _recover_hosts(self):
-		# look for both "Repair Failed" hosts, which we expect, and
-		# "Running" hosts with no active queue entries, which should
-		# never happen
-		rows = _db.execute(
-		    """SELECT * FROM hosts WHERE locked = 0 AND
-		       (status = 'Repair Failed'
-		        OR (status = 'Running' AND
-			    id NOT IN (SELECT host_id FROM host_queue_entries
-			               WHERE active)))""")
-		hosts = [Host(row=i) for i in rows]
-		for host in hosts:
-			if host.status == 'Running':
-				print ('Recovering running host %s - this '
-				       'probably indicates a scheduler bug' %
-				       host.hostname)
-			verify_task = VerifyTask(host = host)
-			self.add_agent(Agent(tasks = [verify_task]))
+		# recover "Repair Failed" hosts
+		message = 'Reverifying dead host %s'
+		self._reverify_hosts_where("status = 'Repair Failed'",
+					   print_message=message)
 
 
 	def _find_more_work(self):
@@ -553,6 +581,7 @@
 		self.results_dir = os.path.abspath(results_dir)
 		self.pid_file = os.path.join(results_dir, AUTOSERV_PID_FILE)
 		self.lost_process = False
+		self.start_time = time.time()
 		if cmd is None:
 			# we're reattaching to an existing pid, so don't call
 			# the superconstructor (we don't want to kick off a new
@@ -568,7 +597,17 @@
 		return pid
 
 
-	def check_proc_fs(self, pid):
+	def _check_command_line(self, command_line, spacer=' ',
+				print_error=False):
+		results_dir_arg = spacer.join(('', '-r', self.results_dir, ''))
+		match = results_dir_arg in command_line
+		if print_error and not match:
+			print '%s not found in %s' % (repr(results_dir_arg),
+						      repr(command_line))
+		return match
+
+
+	def _check_proc_fs(self, pid):
 		cmdline_path = os.path.join('/proc', str(pid), 'cmdline')
 		try:
 			cmdline_file = open(cmdline_path, 'r')
@@ -577,12 +616,8 @@
 		except IOError:
 			return False
 		# /proc/.../cmdline has \x00 separating args
-		results_dir_arg = '\x00-r\x00%s\x00' % self.results_dir
-		if results_dir_arg not in cmdline:
-			print '%s not found in %s' % (repr(results_dir_arg),
-						      repr(cmdline))
-			return False
-		return True
+		return self._check_command_line(cmdline, spacer='\x00',
+						print_error=True)
 
 
 	def read_pidfile(self):
@@ -603,6 +638,14 @@
 		return pid, exit_status
 
 
+	def _find_autoserv_proc(self):
+		autoserv_procs = Dispatcher.find_autoservs()
+		for pid, args in autoserv_procs.iteritems():
+			if self._check_command_line(args):
+				return pid, args
+		return None, None
+
+
 	def get_pidfile_info(self):
 		"""\
 		Returns:
@@ -615,12 +658,18 @@
 
 		pid, exit_status = self.read_pidfile()
 
-		# double check autoserv is really running if it says it is
-		if (pid is not None and exit_status is None
-		    and not self.check_proc_fs(pid)):
-			# maybe process *just* exited
+		if pid is None:
+			return self._handle_no_pid()
+
+		if exit_status is None:
+			# double check whether or not autoserv is running
+			proc_running = self._check_proc_fs(pid)
+			if proc_running:
+				return pid, exit_status
+
+			# pid but no process - maybe process *just* exited
 			pid, exit_status = self.read_pidfile()
-			if not exit_status:
+			if exit_status is None:
 				# autoserv exited without writing an exit code
 				# to the pidfile
 				error = ('autoserv died without writing exit '
@@ -629,17 +678,55 @@
 				    pid, self.pid_file)
 				print message
 				send_notify_email(error, message)
-				self.lost_process = True
-				self.pid = pid
-				self.exit_status = 1
+				self.on_lost_process(pid)
 				return self.pid, self.exit_status
 
 		return pid, exit_status
 
 
+	def _handle_no_pid(self):
+		"""\
+		Called when no pidfile is found or no pid is in the pidfile.
+		"""
+		# is autoserv running?
+		pid, args = self._find_autoserv_proc()
+		if pid is None:
+			# no autoserv process running
+			message = 'No pid found at ' + self.pid_file
+		else:
+			message = ("Process %d (%s) hasn't written pidfile %s" %
+				   (pid, args, self.pid_file))
+
+		print message
+		if time.time() - self.start_time > PIDFILE_TIMEOUT:
+			send_notify_email('Process has failed to write pidfile',
+					  message)
+			if pid is not None:
+				kill_autoserv(pid)
+			else:
+				pid = 0
+			self.on_lost_process(pid)
+			return self.pid, self.exit_status
+
+		return None, None
+
+
+	def on_lost_process(self, pid):
+		"""\
+		Called when autoserv has exited without writing an exit status,
+		or we've timed out waiting for autoserv to write a pid to the
+		pidfile.  In either case, we just return failure and the caller
+		should signal some kind of warning.
+
+		pid is unimportant here, as it shouldn't be used by anyone.
+		"""
+		self.lost_process = True
+		self.pid = pid
+		self.exit_status = 1
+
+
 	def exit_code(self):
 		pid, exit_code = self.get_pidfile_info()
-		assert pid is not None
 		return exit_code
 
 
@@ -1323,15 +1410,21 @@
 			self.job.stop_all_entries()
 
 
-	def clear_results_dir(self, results_dir=None):
+	def clear_results_dir(self, results_dir=None, dont_delete_files=False):
 		results_dir = results_dir or self.results_dir()
 		if not os.path.exists(results_dir):
 			return
+		if dont_delete_files:
+			temp_dir = tempfile.mkdtemp(suffix='.clear_results')
+			print 'Moving results from %s to %s' % (results_dir,
+								temp_dir)
 		for filename in os.listdir(results_dir):
-			if 'queue.log' in filename:
-				continue
 			path = os.path.join(results_dir, filename)
-			remove_file_or_dir(path)
+			if dont_delete_files:
+				shutil.move(path,
+					    os.path.join(temp_dir, filename))
+			else:
+				remove_file_or_dir(path)
 
 
 class Job(DBObject):