Use the new parser library directly inside of autoserv, instead of
repeatedly invoking the parser as a separate process. In order to
actually invoke this capability you need to pass in a -P option, with
the job name (tag).

In order to properly support the parsing of server-side synchronous
jobs you need to make use of job.parallel_simple, a new wrapper around
the standard parallel_simple that adds support for parsing the results
of all of the parallel jobs individually. Unfortunately, complex
server-side synch jobs that use parallel instead of parallel_simple
will be unable to take advantage of this.

Signed-off-by: John Admanski <jadmanski@google.com>



git-svn-id: http://test.kernel.org/svn/autotest/trunk@1446 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/server/server_job.py b/server/server_job.py
index 14b1ec6..5b92cab 100755
--- a/server/server_job.py
+++ b/server/server_job.py
@@ -12,16 +12,13 @@
 """
 
 import os, sys, re, time, select, subprocess, traceback
-import test
-from utils import *
-from common.error import *
 
-# this magic incantation should give us access to a client library
-server_dir = os.path.dirname(__file__)
-client_dir = os.path.join(server_dir, "..", "client", "bin")
-sys.path.append(client_dir)
-import fd_stack
-sys.path.pop()
+from autotest_lib.client.bin import fd_stack
+from autotest_lib.client.common_lib import error
+from autotest_lib.server import test, subcommand
+from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
+from autotest_lib.server.utils import *
+
 
 # load up a control segment
 # these are all stored in <server_dir>/control_segments
@@ -59,7 +56,7 @@
 	host = hosts.SSHHost(machine)
 	at.run(control, host=host)
 
-parallel_simple(run_client, machines)
+job.parallel_simple(run_client, machines)
 """
 
 crashdumps = """
@@ -67,7 +64,7 @@
 	host = hosts.SSHHost(machine, initialize=False)
 	host.get_crashdumps(test_start_time)
 
-parallel_simple(crashdumps, machines, log=False)
+job.parallel_simple(crashdumps, machines, log=False)
 """
 
 reboot_segment="""\
@@ -75,7 +72,7 @@
 	host = hosts.SSHHost(machine, initialize=False)
 	host.reboot()
 
-parallel_simple(reboot, machines, log=False)
+job.parallel_simple(reboot, machines, log=False)
 """
 
 install="""\
@@ -83,7 +80,7 @@
 	host = hosts.SSHHost(machine, initialize=False)
 	host.machine_install()
 
-parallel_simple(install, machines, log=False)
+job.parallel_simple(install, machines, log=False)
 """
 
 # load up the verifier control segment, with an optional site-specific hook
@@ -125,7 +122,7 @@
 	"""
 
 	def __init__(self, control, args, resultdir, label, user, machines,
-								client = False):
+		     client=False, parse_job=""):
 		"""
 			control
 				The control file (pathname of)
@@ -177,10 +174,55 @@
 		job_data.update(get_site_job_data(self))
 		write_keyval(self.resultdir, job_data)
 
+		self.parse_job = parse_job
+		if self.parse_job and len(machines) == 1:
+			self.using_parser = True
+			self.init_parser(resultdir)
+		else:
+			self.using_parser = False
+
+
+	def init_parser(self, resultdir):
+		"""Start the continuous parsing of resultdir. This sets up
+		the database connection and inserts the basic job object into
+		the database if necessary."""
+		# redirect parser debugging to .parse.log
+		parse_log = os.path.join(resultdir, '.parse.log')
+		parse_log = open(parse_log, 'w', 0)
+		tko_utils.redirect_parser_debugging(parse_log)
+		# create a job model object and set up the db
+		self.results_db = tko_db.db(autocommit=True)
+		self.parser = status_lib.parser(0)
+		self.job_model = self.parser.make_job(resultdir)
+		self.parser.start(self.job_model)
+		# check if a job already exists in the db and insert it if
+		# it does not
+		job_idx = self.results_db.find_job(self.parse_job)
+		if job_idx is None:
+			self.results_db.insert_job(self.parse_job,
+						   self.job_model)
+		else:
+			machine_idx = self.results_db.lookup_machine(
+			    self.job_model.machine)
+			self.job_model.index = job_idx
+			self.job_model.machine_idx = machine_idx
+
+
+	def _cleanup(self):
+		"""This should be called after the server job is finished
+		to carry out any remaining cleanup (e.g. flushing any
+		remaining test results to the results db)"""
+		if not self.using_parser:
+			return
+		final_tests = self.parser.end()
+		for test in final_tests:
+			self.results_db.insert_test(self.job_model, test)
+
 
 	def verify(self):
 		if not self.machines:
-			raise AutoservError('No machines specified to verify')
+			raise error.AutoservError(
+			    'No machines specified to verify')
 		try:
 			namespace = {'machines' : self.machines, 'job' : self}
 			exec(preamble + verify, namespace, namespace)
@@ -192,7 +234,8 @@
 
 	def repair(self):
 		if not self.machines:
-			raise AutoservError('No machines specified to repair')
+			raise error.AutoservError(
+			    'No machines specified to repair')
 		namespace = {'machines' : self.machines, 'job' : self}
 		# no matter what happens during repair, go on to try to reverify
 		try:
@@ -221,6 +264,29 @@
 		return False
 
 
+	def parallel_simple(self, function, machines, log=True, timeout=None):
+		"""Run 'function' using parallel_simple, with an extra
+		wrapper to handle the necessary setup for continuous parsing,
+		if possible. If continuous parsing is already properly
+		initialized then this should just work."""
+		is_forking = not (len(machines) == 1 and
+				  self.machines == machines)
+		if self.parse_job and is_forking:
+			def wrapper(machine):
+				self.parse_job += "/" + machine
+				self.using_parser = True
+				self.machines = [machine]
+				self.resultdir = os.path.join(self.resultdir,
+							      machine)
+				self.init_parser(self.resultdir)
+				result = function(machine)
+				self._cleanup()
+				return result
+		else:
+			wrapper = function
+		subcommand.parallel_simple(wrapper, machines, log, timeout)
+
+
 	def run(self, reboot = False, install_before = False,
 		install_after = False, collect_crashdumps = True,
 		namespace = {}):
@@ -324,10 +390,11 @@
 		# We don't want to raise up an error higher if it's just
 		# a TestError - we want to carry on to other tests. Hence
 		# this outer try/except block.
-		except TestError:
+		except error.TestError:
 			pass
 		except:
-			raise TestError(name + ' failed\n' + format_error())
+			raise error.TestError(name + ' failed\n' +
+					      format_error())
 
 		return result
 
@@ -411,18 +478,21 @@
 		"""
 		if subdir:
 			if re.match(r'[\n\t]', subdir):
-				raise ValueError('Invalid character in subdir string')
+				raise ValueError(
+				    'Invalid character in subdir string')
 			substr = subdir
 		else:
 			substr = '----'
-		
+
 		if not re.match(r'(START|(END )?(GOOD|WARN|FAIL|ABORT))$', \
 								status_code):
-			raise ValueError('Invalid status code supplied: %s' % status_code)
+			raise ValueError('Invalid status code supplied: %s' %
+					 status_code)
 		if not operation:
 			operation = '----'
 		if re.match(r'[\n\t]', operation):
-			raise ValueError('Invalid character in operation string')
+			raise ValueError(
+			    'Invalid character in operation string')
 		operation = operation.rstrip()
 		status = status.rstrip()
 		status = re.sub(r"\t", "  ", status)
@@ -455,17 +525,15 @@
 		classes. Unlike __record, this does not write the message
 		to standard output.
 		"""
+		lines = []
 		status_file = os.path.join(self.resultdir, 'status.log')
 		status_log = open(status_file, 'a')
-		need_reparse = False
 		for line in msg.splitlines():
 			line = self.record_prefix + line + '\n'
+			lines.append(line)
 			status_log.write(line)
-			if self.__need_reparse(line):
-				need_reparse = True
 		status_log.close()
-		if need_reparse:
-			self.__parse_status()
+		self.__parse_status(lines)
 
 
 	def __record(self, status_code, subdir, operation, status='',
@@ -489,33 +557,15 @@
 				os.mkdir(test_dir)
 			status_file = os.path.join(test_dir, 'status')
 			open(status_file, "a").write(msg)
-		if self.__need_reparse(msg):
-			self.__parse_status()
+		self.__parse_status([msg])
 
 
-	def __need_reparse(self, line):
-		# the parser will not record results if lines have more than
-		# one level of indentation
-		indent = len(re.search(r"^(\t*)", line).group(1))
-		if indent > 1:
-			return False
-		# we can also skip START lines, as they add nothing
-		line = line.lstrip("\t")
-		if line.startswith("START\t"):
-			return False
-		# otherwise, we should do a parse
-		return True
-
-
-	def __parse_status(self):
-		"""
-		If a .parse.cmd file is present in the results directory,
-		launch the tko parser.
-		"""
-		cmdfile = os.path.join(self.resultdir, '.parse.cmd')
-		if os.path.exists(cmdfile):
-			cmd = open(cmdfile).read().strip()
-			subprocess.Popen(cmd, shell=True)
+	def __parse_status(self, new_lines):
+		if not self.using_parser:
+			return
+		new_tests = self.parser.process_lines(new_lines)
+		for test in new_tests:
+			self.results_db.insert_test(self.job_model, test)
 
 
 # a file-like object for catching stderr from an autotest client and