Move the code responsible for pulling client-side status logs over to
the server into server_job so that it can be integrated with the
serial console warning monitoring.
Signed-off-by: John Admanski <jadmanski@google.com>
git-svn-id: http://test.kernel.org/svn/autotest/trunk@1270 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/server/server_job.py b/server/server_job.py
index f31ca83..0c272ec 100755
--- a/server/server_job.py
+++ b/server/server_job.py
@@ -339,7 +339,16 @@
Executing this method will trigger the logging of all new
warnings to date from the various console loggers.
"""
- # poll the loggers for any new console warnings to log
+ # poll all our warning loggers for new warnings
+ warnings = self._read_warnings()
+ for timestamp, msg in warnings:
+ self.__record("WARN", None, None, msg, timestamp)
+
+ # write out the actual status log line
+ self.__record(status_code, subdir, operation, status)
+
+
+ def _read_warnings(self):
warnings = []
while True:
# pull in a line of output from every logger that has
@@ -363,23 +372,17 @@
if not loggers:
break
- # write out all of the warnings we accumulated
- warnings.sort() # sort into timestamp order
- for timestamp, msg in warnings:
- self.__record("WARN", None, None, msg, timestamp)
-
- # write out the actual status log line
- self.__record(status_code, subdir, operation, status)
+ # sort into timestamp order
+ warnings.sort()
+ return warnings
- def __record(self, status_code, subdir, operation, status='',
- epoch_time=None):
+ def _render_record(self, status_code, subdir, operation, status='',
+ epoch_time=None, record_prefix=None):
"""
- Actual function for recording a single line into the status
- logs. Should never be called directly, only by job.record as
- this would bypass the console monitor logging.
+ Internal Function to generate a record to be written into a
+ status log. For use by server_job.* classes only.
"""
-
if subdir:
if re.match(r'[\n\t]', subdir):
raise ValueError('Invalid character in subdir string')
@@ -409,16 +412,181 @@
local_time_str = time.strftime("localtime=%b %d %H:%M:%S",
local_time)
+ if record_prefix is None:
+ record_prefix = self.record_prefix
+
msg = '\t'.join(str(x) for x in (status_code, substr, operation,
epoch_time_str, local_time_str,
status))
+ return record_prefix + msg + '\n'
+
+
+ def _record_prerendered(self, msg):
+ """
+ Record a pre-rendered msg into the status logs. The only
+ change this makes to the message is to add on the local
+ indentation. Should not be called outside of server_job.*
+ classes. Unlike __record, this does not write the message
+ to standard output.
+ """
+ status_file = os.path.join(self.resultdir, 'status.log')
+ status_log = open(status_file, 'a')
+ for line in msg.splitlines():
+ line = self.record_prefix + line + '\n'
+ status_log.write(line)
+ status_log.close()
+
+
+
+ def __record(self, status_code, subdir, operation, status='',
+ epoch_time=None):
+ """
+ Actual function for recording a single line into the status
+ logs. Should never be called directly, only by job.record as
+ this would bypass the console monitor logging.
+ """
+
+ msg = self._render_record(status_code, subdir, operation,
+ status, epoch_time)
+
status_file = os.path.join(self.resultdir, 'status.log')
- print msg
- open(status_file, "a").write(self.record_prefix + msg + "\n")
+ sys.stdout.write(msg)
+ open(status_file, "a").write(msg)
if subdir:
test_dir = os.path.join(self.resultdir, subdir)
if not os.path.exists(test_dir):
os.mkdir(test_dir)
status_file = os.path.join(test_dir, 'status')
- open(status_file, "a").write(msg + "\n")
+ open(status_file, "a").write(msg)
+
+
+# a file-like object for catching stderr from an autotest client and
+# extracting status logs from it
+class client_logger(object):
+ """Partial file object to write to both stdout and
+ the status log file. We only implement those methods
+ utils.run() actually calls.
+ """
+ parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$")
+ extract_indent = re.compile(r"^(\t*).*$")
+
+ def __init__(self, job):
+ self.job = job
+ self.leftover = ""
+ self.last_line = ""
+ self.logs = {}
+
+
+ def _process_log_dict(self, log_dict):
+ log_list = log_dict.pop("logs", [])
+ for key in sorted(log_dict.iterkeys()):
+ log_list += self._process_log_dict(log_dict.pop(key))
+ return log_list
+
+
+ def _process_logs(self):
+ """Go through the accumulated logs in self.log and print them
+ out to stdout and the status log. Note that this processes
+ logs in an ordering where:
+
+ 1) logs to different tags are never interleaved
+ 2) logs to x.y come before logs to x.y.z for all z
+ 3) logs to x.y come before x.z whenever y < z
+
+ Note that this will in general not be the same as the
+ chronological ordering of the logs. However, if a chronological
+ ordering is desired that one can be reconstructed from the
+ status log by looking at timestamp lines."""
+ log_list = self._process_log_dict(self.logs)
+ for line in log_list:
+ self.job._record_prerendered(line + '\n')
+ if log_list:
+ self.last_line = log_list[-1]
+
+
+ def _process_quoted_line(self, tag, line):
+ """Process a line quoted with an AUTOTEST_STATUS flag. If the
+ tag is blank then we want to push out all the data we've been
+ building up in self.logs, and then the newest line. If the
+ tag is not blank, then push the line into the logs for handling
+ later."""
+ print line
+ if tag == "":
+ self._process_logs()
+ self.job._record_prerendered(line + '\n')
+ self.last_line = line
+ else:
+ tag_parts = [int(x) for x in tag.split(".")]
+ log_dict = self.logs
+ for part in tag_parts:
+ log_dict = log_dict.setdefault(part, {})
+ log_list = log_dict.setdefault("logs", [])
+ log_list.append(line)
+
+
+ def _process_line(self, line):
+ """Write out a line of data to the appropriate stream. Status
+ lines sent by autotest will be prepended with
+ "AUTOTEST_STATUS", and all other lines are ssh error
+ messages."""
+ match = self.parser.search(line)
+ if match:
+ tag, line = match.groups()
+ self._process_quoted_line(tag, line)
+ else:
+ print >> sys.stderr, line
+
+
+ def _format_warnings(self, last_line, warnings):
+ indent = self.extract_indent.match(last_line).group(1)
+ return [self.job._render_record("WARN", None, None, msg,
+ timestamp, indent).rstrip('\n')
+ for timestamp, msg in warnings]
+
+
+ def _process_warnings(self, last_line, log_dict, warnings):
+ if log_dict.keys() in ([], ["logs"]):
+ # there are no sub-jobs, just append the warnings here
+ warnings = self._format_warnings(last_line, warnings)
+ log_list = log_dict.setdefault("logs", [])
+ log_list += warnings
+ for warning in warnings:
+ sys.stdout.write(warning + '\n')
+ else:
+ # there are sub-jobs, so put the warnings in there
+ log_list = log_dict.get("logs", [])
+ if log_list:
+ last_line = log_list[-1]
+ for key in sorted(log_dict.iterkeys()):
+ if key != "logs":
+ self._process_warnings(last_line,
+ log_dict[key],
+ warnings)
+
+
+ def write(self, data):
+ # first check for any new console warnings
+ warnings = self.job._read_warnings()
+ self._process_warnings(self.last_line, self.logs, warnings)
+ # now process the newest data written out
+ data = self.leftover + data
+ lines = data.split("\n")
+ # process every line but the last one
+ for line in lines[:-1]:
+ self._process_line(line)
+ # save the last line for later processing
+ # since we may not have the whole line yet
+ self.leftover = lines[-1]
+
+
+ def flush(self):
+ sys.stdout.flush()
+ sys.stderr.flush()
+
+
+ def close(self):
+ if self.leftover:
+ self._process_line(self.leftover)
+ self._process_logs()
+ self.flush()