Add the version 1 of the TKO parser, modify server_job to use this
version, and modify parse.py to look up the status version number
in order to instantiate the correct parser version.

Basically, it's an implementation of the parser that follows
the spec as outlined in
http://test.kernel.org/autotest/DraftParserSpecification. I did that
by implementing the "version 1" parser, as opposed to the existing
"version 0" parser, and it also adds some code to autotest itself to
log the fact that the status logs being written out follow the
"version 1" specification, so that when re-parsing existing results
older logs will still be parsed using the old (rather ad-hoc and
difficult to follow) algorithm.

The implementation is fairly similar to the existing version 0
implementation; it still uses all the same files for gathering
results, but there are the core changes:
- instead of grabbing kernel information from build.log, it gets
embedded into the status logs associated with reboots
- if a group is lacking a proper "END" section it implicitly assumes
it was somehow aborted
- reboots are wrapped in a group
- a "JOB" result is logged even if nothing bad happens (rather than
only logging it when something bad happens)
- you can have arbitrarily large amounts of group nesting

Signed-off-by: John Admanski <jadmanski@google.com>




git-svn-id: http://test.kernel.org/svn/autotest/trunk@1511 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/tko/parsers/version_1_unittest.py b/tko/parsers/version_1_unittest.py
new file mode 100644
index 0000000..eb3ad37
--- /dev/null
+++ b/tko/parsers/version_1_unittest.py
@@ -0,0 +1,193 @@
+#!/usr/bin/python
+
+import unittest, datetime, time, md5
+
+import common
+from autotest_lib.tko.parsers import version_1
+
+
+class test_status_line(unittest.TestCase):
+	statuses = ["GOOD", "WARN", "FAIL", "ABORT"]
+
+
+	def test_handles_start(self):
+		line = version_1.status_line(0, "START", "----", "test",
+					     "", {})
+		self.assertEquals(line.type, "START")
+		self.assertEquals(line.status, None)
+
+
+	def test_handles_status(self):
+		for stat in self.statuses:
+			line = version_1.status_line(0, stat, "----", "test",
+						     "", {})
+			self.assertEquals(line.type, "STATUS")
+			self.assertEquals(line.status, stat)
+
+
+	def test_handles_endstatus(self):
+		for stat in self.statuses:
+			line = version_1.status_line(0, "END " + stat, "----",
+						     "test", "", {})
+			self.assertEquals(line.type, "END")
+			self.assertEquals(line.status, stat)
+
+
+	def test_fails_on_bad_status(self):
+		for stat in self.statuses:
+			self.assertRaises(AssertionError,
+					  version_1.status_line, 0,
+					  "BAD " + stat, "----", "test",
+					  "", {})
+
+
+	def test_saves_all_fields(self):
+		line = version_1.status_line(5, "GOOD", "subdir_name",
+					     "test_name", "my reason here",
+					     {"key1": "value",
+					      "key2": "another value",
+					      "key3": "value3"})
+		self.assertEquals(line.indent, 5)
+		self.assertEquals(line.status, "GOOD")
+		self.assertEquals(line.subdir, "subdir_name")
+		self.assertEquals(line.testname, "test_name")
+		self.assertEquals(line.reason, "my reason here")
+		self.assertEquals(line.optional_fields,
+				  {"key1": "value", "key2": "another value",
+				   "key3": "value3"})
+
+
+	def test_parses_blank_subdir(self):
+		line = version_1.status_line(0, "GOOD", "----", "test",
+					     "", {})
+		self.assertEquals(line.subdir, None)
+
+
+	def test_parses_blank_testname(self):
+		line = version_1.status_line(0, "GOOD", "subdir", "----",
+					     "", {})
+		self.assertEquals(line.testname, None)
+
+
+	def test_parse_line_smoketest(self):
+		input_data = ("\t\t\tGOOD\t----\t----\t"
+			      "field1=val1\tfield2=val2\tTest Passed")
+		line = version_1.status_line.parse_line(input_data)
+		self.assertEquals(line.indent, 3)
+		self.assertEquals(line.type, "STATUS")
+		self.assertEquals(line.status, "GOOD")
+		self.assertEquals(line.subdir, None)
+		self.assertEquals(line.testname, None)
+		self.assertEquals(line.reason, "Test Passed")
+		self.assertEquals(line.optional_fields,
+				  {"field1": "val1", "field2": "val2"})
+
+	def test_parse_line_handles_newline(self):
+		input_data = ("\t\tGOOD\t----\t----\t"
+			      "field1=val1\tfield2=val2\tNo newline here!")
+		for suffix in ("", "\n"):
+			line = version_1.status_line.parse_line(input_data +
+								suffix)
+			self.assertEquals(line.indent, 2)
+			self.assertEquals(line.type, "STATUS")
+			self.assertEquals(line.status, "GOOD")
+			self.assertEquals(line.subdir, None)
+			self.assertEquals(line.testname, None)
+			self.assertEquals(line.reason, "No newline here!")
+			self.assertEquals(line.optional_fields,
+					  {"field1": "val1",
+					   "field2": "val2"})
+
+
+	def test_parse_line_fails_on_untabbed_lines(self):
+		input_data = "   GOOD\trandom\tfields\tof text"
+		line = version_1.status_line.parse_line(input_data)
+		self.assertEquals(line, None)
+		line = version_1.status_line.parse_line(input_data.lstrip())
+		self.assertEquals(line.indent, 0)
+		self.assertEquals(line.type, "STATUS")
+		self.assertEquals(line.status, "GOOD")
+		self.assertEquals(line.subdir, "random")
+		self.assertEquals(line.testname, "fields")
+		self.assertEquals(line.reason, "of text")
+		self.assertEquals(line.optional_fields, {})
+
+
+	def test_parse_line_fails_on_bad_optional_fields(self):
+		input_data = "GOOD\tfield1\tfield2\tfield3\tfield4"
+		self.assertRaises(AssertionError,
+				  version_1.status_line.parse_line,
+				  input_data)
+
+
+	def test_good_reboot_passes_success_test(self):
+		line = version_1.status_line(0, "NOSTATUS", None, "reboot",
+					     "reboot success", {})
+		self.assertEquals(line.is_successful_reboot("GOOD"), True)
+		self.assertEquals(line.is_successful_reboot("WARN"), True)
+
+
+	def test_bad_reboot_passes_success_test(self):
+		line = version_1.status_line(0, "NOSTATUS", None, "reboot",
+					     "reboot success", {})
+		self.assertEquals(line.is_successful_reboot("FAIL"), False)
+		self.assertEquals(line.is_successful_reboot("ABORT"), False)
+
+
+	def test_get_kernel_returns_kernel_plus_patches(self):
+		line = version_1.status_line(0, "GOOD", "subdir", "testname",
+					     "reason text",
+					     {"kernel": "2.6.24-rc40",
+					      "patch0": "first_patch 0 0",
+					      "patch1": "another_patch 0 0"})
+		kern = line.get_kernel()
+		kernel_hash = md5.new("2.6.24-rc40,0,0").hexdigest()
+		self.assertEquals(kern.base, "2.6.24-rc40")
+		self.assertEquals(kern.patches[0].spec, "first_patch")
+		self.assertEquals(kern.patches[1].spec, "another_patch")
+		self.assertEquals(len(kern.patches), 2)
+		self.assertEquals(kern.kernel_hash, kernel_hash)
+
+
+	def test_get_kernel_ignores_out_of_sequence_patches(self):
+		line = version_1.status_line(0, "GOOD", "subdir", "testname",
+					     "reason text",
+					     {"kernel": "2.6.24-rc40",
+					      "patch0": "first_patch 0 0",
+					      "patch2": "another_patch 0 0"})
+		kern = line.get_kernel()
+		kernel_hash = md5.new("2.6.24-rc40,0").hexdigest()
+		self.assertEquals(kern.base, "2.6.24-rc40")
+		self.assertEquals(kern.patches[0].spec, "first_patch")
+		self.assertEquals(len(kern.patches), 1)
+		self.assertEquals(kern.kernel_hash, kernel_hash)
+
+
+	def test_get_kernel_returns_unknown_with_no_kernel(self):
+		line = version_1.status_line(0, "GOOD", "subdir", "testname",
+					     "reason text",
+					     {"patch0": "first_patch 0 0",
+					      "patch2": "another_patch 0 0"})
+		kern = line.get_kernel()
+		self.assertEquals(kern.base, "UNKNOWN")
+		self.assertEquals(kern.patches, [])
+		self.assertEquals(kern.kernel_hash, "UNKNOWN")
+
+
+	def test_get_timestamp_returns_timestamp_field(self):
+		timestamp = datetime.datetime(1970, 1, 1, 4, 30)
+		timestamp -= datetime.timedelta(seconds=time.timezone)
+		line = version_1.status_line(0, "GOOD", "subdir", "testname",
+					     "reason text",
+					     {"timestamp": "16200"})
+		self.assertEquals(timestamp, line.get_timestamp())
+
+
+	def test_get_timestamp_returns_none_on_missing_field(self):
+		line = version_1.status_line(0, "GOOD", "subdir", "testname",
+					     "reason text", {})
+		self.assertEquals(None, line.get_timestamp())
+
+
+if __name__ == "__main__":
+	unittest.main()