Move the code responsible for pulling client-side status logs over to
the server into server_job so that it can be integrated with the
serial console warning monitoring.

Signed-off-by: John Admanski <jadmanski@google.com>




git-svn-id: http://test.kernel.org/svn/autotest/trunk@1270 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/server/server_job.py b/server/server_job.py
index f31ca83..0c272ec 100755
--- a/server/server_job.py
+++ b/server/server_job.py
@@ -339,7 +339,16 @@
 		Executing this method will trigger the logging of all new
 		warnings to date from the various console loggers.
 		"""
-		# poll the loggers for any new console warnings to log
+		# poll all our warning loggers for new warnings
+		warnings = self._read_warnings()
+		for timestamp, msg in warnings:
+			self.__record("WARN", None, None, msg, timestamp)
+
+		# write out the actual status log line
+		self.__record(status_code, subdir, operation, status)
+
+
+	def _read_warnings(self):
 		warnings = []
 		while True:
 			# pull in a line of output from every logger that has
@@ -363,23 +372,17 @@
 			if not loggers:
 				break
 
-		# write out all of the warnings we accumulated
-		warnings.sort() # sort into timestamp order
-		for timestamp, msg in warnings:
-			self.__record("WARN", None, None, msg, timestamp)
-
-		# write out the actual status log line
-		self.__record(status_code, subdir, operation, status)
+		# sort into timestamp order
+		warnings.sort()
+		return warnings
 
 
-	def __record(self, status_code, subdir, operation, status='',
-		     epoch_time=None):
+	def _render_record(self, status_code, subdir, operation, status='',
+			   epoch_time=None, record_prefix=None):
 		"""
-		Actual function for recording a single line into the status
-		logs. Should never be called directly, only by job.record as
-		this would bypass the console monitor logging.
+		Internal Function to generate a record to be written into a
+		status log. For use by server_job.* classes only.
 		"""
-
 		if subdir:
 			if re.match(r'[\n\t]', subdir):
 				raise ValueError('Invalid character in subdir string')
@@ -409,16 +412,181 @@
 		local_time_str = time.strftime("localtime=%b %d %H:%M:%S",
 					       local_time)
 
+		if record_prefix is None:
+			record_prefix = self.record_prefix
+
 		msg = '\t'.join(str(x) for x in (status_code, substr, operation,
 						 epoch_time_str, local_time_str,
 						 status))
+		return record_prefix + msg + '\n'
+
+
+	def _record_prerendered(self, msg):
+		"""
+		Record a pre-rendered msg into the status logs. The only
+		change this makes to the message is to add on the local
+		indentation. Should not be called outside of server_job.*
+		classes. Unlike __record, this does not write the message
+		to standard output.
+		"""
+		status_file = os.path.join(self.resultdir, 'status.log')
+		status_log = open(status_file, 'a')
+		for line in msg.splitlines():
+			line = self.record_prefix + line + '\n'
+			status_log.write(line)
+		status_log.close()
+
+
+
+	def __record(self, status_code, subdir, operation, status='',
+		     epoch_time=None):
+		"""
+		Actual function for recording a single line into the status
+		logs. Should never be called directly, only by job.record as
+		this would bypass the console monitor logging.
+		"""
+
+		msg = self._render_record(status_code, subdir, operation,
+					  status, epoch_time)
+
 
 		status_file = os.path.join(self.resultdir, 'status.log')
-		print msg
-		open(status_file, "a").write(self.record_prefix + msg + "\n")
+		sys.stdout.write(msg)
+		open(status_file, "a").write(msg)
 		if subdir:
 			test_dir = os.path.join(self.resultdir, subdir)
 			if not os.path.exists(test_dir):
 				os.mkdir(test_dir)
 			status_file = os.path.join(test_dir, 'status')
-			open(status_file, "a").write(msg + "\n")
+			open(status_file, "a").write(msg)
+
+
+# a file-like object for catching stderr from an autotest client and
+# extracting status logs from it
+class client_logger(object):
+	"""Partial file object to write to both stdout and
+	the status log file.  We only implement those methods
+	utils.run() actually calls.
+	"""
+	parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$")
+	extract_indent = re.compile(r"^(\t*).*$")
+
+	def __init__(self, job):
+		self.job = job
+		self.leftover = ""
+		self.last_line = ""
+		self.logs = {}
+
+
+	def _process_log_dict(self, log_dict):
+		log_list = log_dict.pop("logs", [])
+		for key in sorted(log_dict.iterkeys()):
+			log_list += self._process_log_dict(log_dict.pop(key))
+		return log_list
+
+
+	def _process_logs(self):
+		"""Go through the accumulated logs in self.log and print them
+		out to stdout and the status log. Note that this processes
+		logs in an ordering where:
+
+		1) logs to different tags are never interleaved
+		2) logs to x.y come before logs to x.y.z for all z
+		3) logs to x.y come before x.z whenever y < z
+
+		Note that this will in general not be the same as the
+		chronological ordering of the logs. However, if a chronological
+		ordering is desired that one can be reconstructed from the
+		status log by looking at timestamp lines."""
+		log_list = self._process_log_dict(self.logs)
+		for line in log_list:
+			self.job._record_prerendered(line + '\n')
+		if log_list:
+			self.last_line = log_list[-1]
+
+
+	def _process_quoted_line(self, tag, line):
+		"""Process a line quoted with an AUTOTEST_STATUS flag. If the
+		tag is blank then we want to push out all the data we've been
+		building up in self.logs, and then the newest line. If the
+		tag is not blank, then push the line into the logs for handling
+		later."""
+		print line
+		if tag == "":
+			self._process_logs()
+			self.job._record_prerendered(line + '\n')
+			self.last_line = line
+		else:
+			tag_parts = [int(x) for x in tag.split(".")]
+			log_dict = self.logs
+			for part in tag_parts:
+				log_dict = log_dict.setdefault(part, {})
+			log_list = log_dict.setdefault("logs", [])
+			log_list.append(line)
+
+
+	def _process_line(self, line):
+		"""Write out a line of data to the appropriate stream. Status
+		lines sent by autotest will be prepended with
+		"AUTOTEST_STATUS", and all other lines are ssh error
+		messages."""
+		match = self.parser.search(line)
+		if match:
+			tag, line = match.groups()
+			self._process_quoted_line(tag, line)
+		else:
+			print >> sys.stderr, line
+
+
+	def _format_warnings(self, last_line, warnings):
+		indent = self.extract_indent.match(last_line).group(1)
+		return [self.job._render_record("WARN", None, None, msg,
+						timestamp, indent).rstrip('\n')
+			for timestamp, msg in warnings]
+
+
+	def _process_warnings(self, last_line, log_dict, warnings):
+		if log_dict.keys() in ([], ["logs"]):
+			# there are no sub-jobs, just append the warnings here
+			warnings = self._format_warnings(last_line, warnings)
+			log_list = log_dict.setdefault("logs", [])
+			log_list += warnings
+			for warning in warnings:
+				sys.stdout.write(warning + '\n')
+		else:
+			# there are sub-jobs, so put the warnings in there
+			log_list = log_dict.get("logs", [])
+			if log_list:
+				last_line = log_list[-1]
+			for key in sorted(log_dict.iterkeys()):
+				if key != "logs":
+					self._process_warnings(last_line,
+							       log_dict[key],
+							       warnings)
+
+
+	def write(self, data):
+		# first check for any new console warnings
+		warnings = self.job._read_warnings()
+		self._process_warnings(self.last_line, self.logs, warnings)
+		# now process the newest data written out
+		data = self.leftover + data
+		lines = data.split("\n")
+		# process every line but the last one
+		for line in lines[:-1]:
+			self._process_line(line)
+		# save the last line for later processing
+		# since we may not have the whole line yet
+		self.leftover = lines[-1]
+
+
+	def flush(self):
+		sys.stdout.flush()
+		sys.stderr.flush()
+
+
+	def close(self):
+		if self.leftover:
+			self._process_line(self.leftover)
+		self._process_logs()
+		self.flush()