Use the new parser library directly inside of autoserv, instead of
repeatedly invoking the parser as a separate process. In order to
actually invoke this capability you need to pass in a -P option, with
the job name (tag).
In order to properly support the parsing of server-side synchronous
jobs you need to make use of job.parallel_simple, a new wrapper around
the standard parallel_simple that adds support for parsing the results
of all of the parallel jobs individually. Unfortunately, complex
server-side synch jobs that use parallel instead of parallel_simple
will be unable to take advantage of this.
Signed-off-by: John Admanski <jadmanski@google.com>
git-svn-id: http://test.kernel.org/svn/autotest/trunk@1446 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/server/server_job.py b/server/server_job.py
index 14b1ec6..5b92cab 100755
--- a/server/server_job.py
+++ b/server/server_job.py
@@ -12,16 +12,13 @@
"""
import os, sys, re, time, select, subprocess, traceback
-import test
-from utils import *
-from common.error import *
-# this magic incantation should give us access to a client library
-server_dir = os.path.dirname(__file__)
-client_dir = os.path.join(server_dir, "..", "client", "bin")
-sys.path.append(client_dir)
-import fd_stack
-sys.path.pop()
+from autotest_lib.client.bin import fd_stack
+from autotest_lib.client.common_lib import error
+from autotest_lib.server import test, subcommand
+from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
+from autotest_lib.server.utils import *
+
# load up a control segment
# these are all stored in <server_dir>/control_segments
@@ -59,7 +56,7 @@
host = hosts.SSHHost(machine)
at.run(control, host=host)
-parallel_simple(run_client, machines)
+job.parallel_simple(run_client, machines)
"""
crashdumps = """
@@ -67,7 +64,7 @@
host = hosts.SSHHost(machine, initialize=False)
host.get_crashdumps(test_start_time)
-parallel_simple(crashdumps, machines, log=False)
+job.parallel_simple(crashdumps, machines, log=False)
"""
reboot_segment="""\
@@ -75,7 +72,7 @@
host = hosts.SSHHost(machine, initialize=False)
host.reboot()
-parallel_simple(reboot, machines, log=False)
+job.parallel_simple(reboot, machines, log=False)
"""
install="""\
@@ -83,7 +80,7 @@
host = hosts.SSHHost(machine, initialize=False)
host.machine_install()
-parallel_simple(install, machines, log=False)
+job.parallel_simple(install, machines, log=False)
"""
# load up the verifier control segment, with an optional site-specific hook
@@ -125,7 +122,7 @@
"""
def __init__(self, control, args, resultdir, label, user, machines,
- client = False):
+ client=False, parse_job=""):
"""
control
The control file (pathname of)
@@ -177,10 +174,55 @@
job_data.update(get_site_job_data(self))
write_keyval(self.resultdir, job_data)
+ self.parse_job = parse_job
+ if self.parse_job and len(machines) == 1:
+ self.using_parser = True
+ self.init_parser(resultdir)
+ else:
+ self.using_parser = False
+
+
+ def init_parser(self, resultdir):
+ """Start the continuous parsing of resultdir. This sets up
+ the database connection and inserts the basic job object into
+ the database if necessary."""
+ # redirect parser debugging to .parse.log
+ parse_log = os.path.join(resultdir, '.parse.log')
+ parse_log = open(parse_log, 'w', 0)
+ tko_utils.redirect_parser_debugging(parse_log)
+ # create a job model object and set up the db
+ self.results_db = tko_db.db(autocommit=True)
+ self.parser = status_lib.parser(0)
+ self.job_model = self.parser.make_job(resultdir)
+ self.parser.start(self.job_model)
+ # check if a job already exists in the db and insert it if
+ # it does not
+ job_idx = self.results_db.find_job(self.parse_job)
+ if job_idx is None:
+ self.results_db.insert_job(self.parse_job,
+ self.job_model)
+ else:
+ machine_idx = self.results_db.lookup_machine(
+ self.job_model.machine)
+ self.job_model.index = job_idx
+ self.job_model.machine_idx = machine_idx
+
+
+ def _cleanup(self):
+ """This should be called after the server job is finished
+ to carry out any remaining cleanup (e.g. flushing any
+ remaining test results to the results db)"""
+ if not self.using_parser:
+ return
+ final_tests = self.parser.end()
+ for test in final_tests:
+ self.results_db.insert_test(self.job_model, test)
+
def verify(self):
if not self.machines:
- raise AutoservError('No machines specified to verify')
+ raise error.AutoservError(
+ 'No machines specified to verify')
try:
namespace = {'machines' : self.machines, 'job' : self}
exec(preamble + verify, namespace, namespace)
@@ -192,7 +234,8 @@
def repair(self):
if not self.machines:
- raise AutoservError('No machines specified to repair')
+ raise error.AutoservError(
+ 'No machines specified to repair')
namespace = {'machines' : self.machines, 'job' : self}
# no matter what happens during repair, go on to try to reverify
try:
@@ -221,6 +264,29 @@
return False
+ def parallel_simple(self, function, machines, log=True, timeout=None):
+ """Run 'function' using parallel_simple, with an extra
+ wrapper to handle the necessary setup for continuous parsing,
+ if possible. If continuous parsing is already properly
+ initialized then this should just work."""
+ is_forking = not (len(machines) == 1 and
+ self.machines == machines)
+ if self.parse_job and is_forking:
+ def wrapper(machine):
+ self.parse_job += "/" + machine
+ self.using_parser = True
+ self.machines = [machine]
+ self.resultdir = os.path.join(self.resultdir,
+ machine)
+ self.init_parser(self.resultdir)
+ result = function(machine)
+ self._cleanup()
+ return result
+ else:
+ wrapper = function
+ subcommand.parallel_simple(wrapper, machines, log, timeout)
+
+
def run(self, reboot = False, install_before = False,
install_after = False, collect_crashdumps = True,
namespace = {}):
@@ -324,10 +390,11 @@
# We don't want to raise up an error higher if it's just
# a TestError - we want to carry on to other tests. Hence
# this outer try/except block.
- except TestError:
+ except error.TestError:
pass
except:
- raise TestError(name + ' failed\n' + format_error())
+ raise error.TestError(name + ' failed\n' +
+ format_error())
return result
@@ -411,18 +478,21 @@
"""
if subdir:
if re.match(r'[\n\t]', subdir):
- raise ValueError('Invalid character in subdir string')
+ raise ValueError(
+ 'Invalid character in subdir string')
substr = subdir
else:
substr = '----'
-
+
if not re.match(r'(START|(END )?(GOOD|WARN|FAIL|ABORT))$', \
status_code):
- raise ValueError('Invalid status code supplied: %s' % status_code)
+ raise ValueError('Invalid status code supplied: %s' %
+ status_code)
if not operation:
operation = '----'
if re.match(r'[\n\t]', operation):
- raise ValueError('Invalid character in operation string')
+ raise ValueError(
+ 'Invalid character in operation string')
operation = operation.rstrip()
status = status.rstrip()
status = re.sub(r"\t", " ", status)
@@ -455,17 +525,15 @@
classes. Unlike __record, this does not write the message
to standard output.
"""
+ lines = []
status_file = os.path.join(self.resultdir, 'status.log')
status_log = open(status_file, 'a')
- need_reparse = False
for line in msg.splitlines():
line = self.record_prefix + line + '\n'
+ lines.append(line)
status_log.write(line)
- if self.__need_reparse(line):
- need_reparse = True
status_log.close()
- if need_reparse:
- self.__parse_status()
+ self.__parse_status(lines)
def __record(self, status_code, subdir, operation, status='',
@@ -489,33 +557,15 @@
os.mkdir(test_dir)
status_file = os.path.join(test_dir, 'status')
open(status_file, "a").write(msg)
- if self.__need_reparse(msg):
- self.__parse_status()
+ self.__parse_status([msg])
- def __need_reparse(self, line):
- # the parser will not record results if lines have more than
- # one level of indentation
- indent = len(re.search(r"^(\t*)", line).group(1))
- if indent > 1:
- return False
- # we can also skip START lines, as they add nothing
- line = line.lstrip("\t")
- if line.startswith("START\t"):
- return False
- # otherwise, we should do a parse
- return True
-
-
- def __parse_status(self):
- """
- If a .parse.cmd file is present in the results directory,
- launch the tko parser.
- """
- cmdfile = os.path.join(self.resultdir, '.parse.cmd')
- if os.path.exists(cmdfile):
- cmd = open(cmdfile).read().strip()
- subprocess.Popen(cmd, shell=True)
+ def __parse_status(self, new_lines):
+ if not self.using_parser:
+ return
+ new_tests = self.parser.process_lines(new_lines)
+ for test in new_tests:
+ self.results_db.insert_test(self.job_model, test)
# a file-like object for catching stderr from an autotest client and