-when no pidfile is found by PidfileRunMonitor, just wait, and after a timeout, send email and act as if process failed
-when requeuing a job during recovery, don't just delete old results, instead move them to a temp dir and print it
-always reverify "Running" hosts (not just in host recover mode)
-reorganized recovery code a bit
-remove unnecessary check for queue.log files in clear_results_dir (was leftover from when we had ReverifyTask)
Signed-off-by: Steve Howard <showard@google.com>
git-svn-id: http://test.kernel.org/svn/autotest/trunk@1362 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/scheduler/monitor_db b/scheduler/monitor_db
index 0f300aa..f1e340c 100755
--- a/scheduler/monitor_db
+++ b/scheduler/monitor_db
@@ -23,6 +23,8 @@
sys.path.insert(0, AUTOTEST_SERVER_DIR)
AUTOSERV_PID_FILE = '.autoserv_execute'
+# how long to wait for autoserv to write a pidfile
+PIDFILE_TIMEOUT = 5 * 60 # 5 min
_db = None
_shutdown = False
@@ -265,6 +267,8 @@
class Dispatcher:
+ autoserv_procs_cache = None
+
def __init__(self):
self._agents = []
@@ -278,6 +282,7 @@
def tick(self):
+ Dispatcher.autoserv_procs_cache = None
self._find_aborting()
self._find_more_work()
self._handle_agents()
@@ -300,22 +305,34 @@
self._agents.remove(agent)
- def find_orphaned_autoservs(self):
+ @classmethod
+ def find_autoservs(cls, orphans_only=False):
"""\
Returns a dict mapping pids to command lines for root autoserv
- processes that have been orphaned.
+ processes. If orphans_only=True, return only processes that
+ have been orphaned (i.e. parent pid = 1).
"""
+ if cls.autoserv_procs_cache is not None:
+ return cls.autoserv_procs_cache
+
proc = subprocess.Popen(
- ['/bin/ps', 'x', '-o', 'pid,ppid,comm,args'],
+ ['/bin/ps', 'x', '-o', 'pid,pgid,ppid,comm,args'],
stdout=subprocess.PIPE)
# split each line into the four columns output by ps
- procs = [line.split(None, 3) for line in
+ procs = [line.split(None, 4) for line in
proc.communicate()[0].splitlines()]
- autoserv_procs = [(int(proc[0]), proc[3]) # pid, args
- for proc in procs
- if proc[2] == 'autoserv' # comm
- and proc[1] == '1'] # ppid
- return dict(autoserv_procs)
+ autoserv_procs = {}
+ for proc in procs:
+ # check ppid == 1 for orphans
+ if orphans_only and proc[2] != 1:
+ continue
+ # only root autoserv processes have pgid == pid
+ if (proc[3] == 'autoserv' and # comm
+ proc[1] == proc[0]): # pgid == pid
+ # map pid to args
+ autoserv_procs[int(proc[0])] = proc[4]
+ cls.autoserv_procs_cache = autoserv_procs
+ return autoserv_procs
def recover_queue_entry(self, queue_entry, run_monitor):
@@ -335,7 +352,7 @@
def _recover_processes(self):
- orphans = self.find_orphaned_autoservs()
+ orphans = self.find_autoservs(orphans_only=True)
# first, recover running queue entries
rows = _db.execute("""SELECT * FROM host_queue_entries
@@ -375,7 +392,7 @@
queue_entries = [HostQueueEntry(row=i) for i in rows]
for queue_entry in queue_entries + requeue_entries:
print 'Requeuing running QE %d' % queue_entry.id
- queue_entry.clear_results_dir()
+ queue_entry.clear_results_dir(dont_delete_files=True)
queue_entry.requeue()
@@ -385,10 +402,12 @@
kill_autoserv(pid)
# recover aborting tasks
+ rebooting_host_ids = set()
rows = _db.execute("""SELECT * FROM host_queue_entries
WHERE status='Abort' or status='Aborting'""")
queue_entries = [HostQueueEntry(row=i) for i in rows]
for queue_entry in queue_entries:
+ print 'Recovering aborting QE %d' % queue_entry.id
queue_host = queue_entry.get_host()
reboot_task = RebootTask(queue_host)
verify_task = VerifyTask(host = queue_host)
@@ -398,37 +417,46 @@
queue_entry.set_status('Aborted')
# Secure the host from being picked up
queue_host.set_status('Rebooting')
+ rebooting_host_ids.add(queue_host.id)
# reverify hosts that were in the middle of verify, repair or
# reboot
- rows = _db.execute("""SELECT * FROM hosts
- WHERE locked = 0 AND
- (status = 'Repairing'
- OR status = 'Verifying'
- OR status = 'Rebooting')""")
+ self._reverify_hosts_where("""(status = 'Repairing' OR
+ status = 'Verifying' OR
+ status = 'Rebooting')""",
+ exclude_ids=rebooting_host_ids)
+
+ # finally, recover "Running" hosts with no active queue entries,
+ # although this should never happen
+ message = ('Recovering running host %s - this probably '
+ 'indicates a scheduler bug')
+ self._reverify_hosts_where("""status = 'Running' AND
+ id NOT IN (SELECT host_id
+ FROM host_queue_entries
+ WHERE active)""",
+ print_message=message)
+
+
+ def _reverify_hosts_where(self, where,
+ print_message='Reverifying host %s',
+ exclude_ids=set()):
+ rows = _db.execute('SELECT * FROM hosts WHERE locked = 0 AND ' +
+ where)
hosts = [Host(row=i) for i in rows]
for host in hosts:
- self.add_agent(Agent(tasks=[VerifyTask(host=host)]))
+ if host.id in exclude_ids:
+ continue
+ if print_message is not None:
+ print print_message % host.hostname
+ verify_task = VerifyTask(host = host)
+ self.add_agent(Agent(tasks = [verify_task]))
def _recover_hosts(self):
- # look for both "Repair Failed" hosts, which we expect, and
- # "Running" hosts with no active queue entries, which should
- # never happen
- rows = _db.execute(
- """SELECT * FROM hosts WHERE locked = 0 AND
- (status = 'Repair Failed'
- OR (status = 'Running' AND
- id NOT IN (SELECT host_id FROM host_queue_entries
- WHERE active)))""")
- hosts = [Host(row=i) for i in rows]
- for host in hosts:
- if host.status == 'Running':
- print ('Recovering running host %s - this '
- 'probably indicates a scheduler bug' %
- host.hostname)
- verify_task = VerifyTask(host = host)
- self.add_agent(Agent(tasks = [verify_task]))
+ # recover "Repair Failed" hosts
+ message = 'Reverifying dead host %s'
+ self._reverify_hosts_where("status = 'Repair Failed'",
+ print_message=message)
def _find_more_work(self):
@@ -553,6 +581,7 @@
self.results_dir = os.path.abspath(results_dir)
self.pid_file = os.path.join(results_dir, AUTOSERV_PID_FILE)
self.lost_process = False
+ self.start_time = time.time()
if cmd is None:
# we're reattaching to an existing pid, so don't call
# the superconstructor (we don't want to kick off a new
@@ -568,7 +597,17 @@
return pid
- def check_proc_fs(self, pid):
+ def _check_command_line(self, command_line, spacer=' ',
+ print_error=False):
+ results_dir_arg = spacer.join(('', '-r', self.results_dir, ''))
+ match = results_dir_arg in command_line
+ if print_error and not match:
+ print '%s not found in %s' % (repr(results_dir_arg),
+ repr(command_line))
+ return match
+
+
+ def _check_proc_fs(self, pid):
cmdline_path = os.path.join('/proc', str(pid), 'cmdline')
try:
cmdline_file = open(cmdline_path, 'r')
@@ -577,12 +616,8 @@
except IOError:
return False
# /proc/.../cmdline has \x00 separating args
- results_dir_arg = '\x00-r\x00%s\x00' % self.results_dir
- if results_dir_arg not in cmdline:
- print '%s not found in %s' % (repr(results_dir_arg),
- repr(cmdline))
- return False
- return True
+ return self._check_command_line(cmdline, spacer='\x00',
+ print_error=True)
def read_pidfile(self):
@@ -603,6 +638,14 @@
return pid, exit_status
+ def _find_autoserv_proc(self):
+ autoserv_procs = Dispatcher.find_autoservs()
+ for pid, args in autoserv_procs.iteritems():
+ if self._check_command_line(args):
+ return pid, args
+ return None, None
+
+
def get_pidfile_info(self):
"""\
Returns:
@@ -615,12 +658,18 @@
pid, exit_status = self.read_pidfile()
- # double check autoserv is really running if it says it is
- if (pid is not None and exit_status is None
- and not self.check_proc_fs(pid)):
- # maybe process *just* exited
+ if pid is None:
+ return self._handle_no_pid()
+
+ if exit_status is None:
+ # double check whether or not autoserv is running
+ proc_running = self._check_proc_fs(pid)
+ if proc_running:
+ return pid, exit_status
+
+ # pid but no process - maybe process *just* exited
pid, exit_status = self.read_pidfile()
- if not exit_status:
+ if exit_status is None:
# autoserv exited without writing an exit code
# to the pidfile
error = ('autoserv died without writing exit '
@@ -629,17 +678,55 @@
pid, self.pid_file)
print message
send_notify_email(error, message)
- self.lost_process = True
- self.pid = pid
- self.exit_status = 1
+ self.on_lost_process(pid)
return self.pid, self.exit_status
return pid, exit_status
+ def _handle_no_pid(self):
+ """\
+ Called when no pidfile is found or no pid is in the pidfile.
+ """
+ # is autoserv running?
+ pid, args = self._find_autoserv_proc()
+ if pid is None:
+ # no autoserv process running
+ message = 'No pid found at ' + self.pid_file
+ else:
+ message = ("Process %d (%s) hasn't written pidfile %s" %
+ (pid, args, self.pid_file))
+
+ print message
+ if time.time() - self.start_time > PIDFILE_TIMEOUT:
+ send_notify_email('Process has failed to write pidfile',
+ message)
+ if pid is not None:
+ kill_autoserv(pid)
+ else:
+ pid = 0
+ self.on_lost_process(pid)
+ return self.pid, self.exit_status
+
+ return None, None
+
+
+ def on_lost_process(self, pid):
+ """\
+ Called when autoserv has exited without writing an exit status,
+ or we've timed out waiting for autoserv to write a pid to the
+ pidfile. In either case, we just return failure and the caller
+ should signal some kind of warning.
+
+ pid is unimportant here, as it shouldn't be used by anyone.
+ """
+ self.lost_process = True
+ self.pid = pid
+ self.exit_status = 1
+
+
def exit_code(self):
pid, exit_code = self.get_pidfile_info()
- assert pid is not None
return exit_code
@@ -1323,15 +1410,21 @@
self.job.stop_all_entries()
- def clear_results_dir(self, results_dir=None):
+ def clear_results_dir(self, results_dir=None, dont_delete_files=False):
results_dir = results_dir or self.results_dir()
if not os.path.exists(results_dir):
return
+ if dont_delete_files:
+ temp_dir = tempfile.mkdtemp(suffix='.clear_results')
+ print 'Moving results from %s to %s' % (results_dir,
+ temp_dir)
for filename in os.listdir(results_dir):
- if 'queue.log' in filename:
- continue
path = os.path.join(results_dir, filename)
- remove_file_or_dir(path)
+ if dont_delete_files:
+ shutil.move(path,
+ os.path.join(temp_dir, filename))
+ else:
+ remove_file_or_dir(path)
class Job(DBObject):