faft: Move FAFT checkers to a separate module
BUG=chrome-os-partner:35902
TEST=Run FAFT suites and check they still work fine.
Change-Id: Idfb60632b67e7bc92b4ae12a60269e00c7b90b77
Signed-off-by: Vic Yang <victoryang@chromium.org>
Reviewed-on: https://gerrit.chromium.org/gerrit/36989
Reviewed-by: Tom Wai-Hong Tam <waihong@chromium.org>
diff --git a/server/cros/faftsequence.py b/server/cros/faftsequence.py
index e3be601..07dc798 100644
--- a/server/cros/faftsequence.py
+++ b/server/cros/faftsequence.py
@@ -14,6 +14,7 @@
from autotest_lib.client.common_lib import error
from autotest_lib.server.cros import vboot_constants as vboot
from autotest_lib.server.cros.chrome_ec import ChromeEC
+from autotest_lib.server.cros.faft_checkers import FAFTCheckers
from autotest_lib.server.cros.faft_client_attribute import FAFTClientAttribute
from autotest_lib.server.cros.faft_delay_constants import FAFTDelayConstants
from autotest_lib.server.cros.servo_test import ServoTest
@@ -165,6 +166,7 @@
self.faft_client.get_platform_name())
self.delay = FAFTDelayConstants(
self.faft_client.get_platform_name())
+ self.checkers = FAFTCheckers(self, self.faft_client)
if self.client_attr.chrome_ec:
self.ec = ChromeEC(self.servo)
@@ -293,7 +295,7 @@
"""
# DUT works fine and is already in recovery boot, done.
if self._ping_test(self._client.ip, timeout=5):
- if self.crossystem_checker({'mainfw_type': 'recovery'}):
+ if self.checkers.crossystem_checker({'mainfw_type': 'recovery'}):
return
logging.info('Try boot into USB image...')
@@ -460,7 +462,7 @@
self.register_faft_sequence((
{ # Step 1, request recovery boot
- 'state_checker': (self.crossystem_checker, {
+ 'state_checker': (self.checkers.crossystem_checker, {
'mainfw_type': ('developer', 'normal'),
}),
'userspace_action': self.faft_client.request_recovery_boot,
@@ -468,7 +470,7 @@
'install_deps_after_boot': True,
},
{ # Step 2, expected recovery boot
- 'state_checker': (self.crossystem_checker, {
+ 'state_checker': (self.checkers.crossystem_checker, {
'mainfw_type': 'recovery',
'recovery_reason' : vboot.RECOVERY_REASON['US_TEST'],
}),
@@ -478,7 +480,7 @@
'install_deps_after_boot': True,
},
{ # Step 3, expected normal or developer boot (not recovery)
- 'state_checker': (self.crossystem_checker, {
+ 'state_checker': (self.checkers.crossystem_checker, {
'mainfw_type': ('developer', 'normal')
}),
},
@@ -544,185 +546,6 @@
return True
- def _parse_crossystem_output(self, lines):
- """Parse the crossystem output into a dict.
-
- Args:
- lines: The list of crossystem output strings.
-
- Returns:
- A dict which contains the crossystem keys/values.
-
- Raises:
- error.TestError: If wrong format in crossystem output.
-
- >>> seq = FAFTSequence()
- >>> seq._parse_crossystem_output([ \
- "arch = x86 # Platform architecture", \
- "cros_debug = 1 # OS should allow debug", \
- ])
- {'cros_debug': '1', 'arch': 'x86'}
- >>> seq._parse_crossystem_output([ \
- "arch=x86", \
- ])
- Traceback (most recent call last):
- ...
- TestError: Failed to parse crossystem output: arch=x86
- >>> seq._parse_crossystem_output([ \
- "arch = x86 # Platform architecture", \
- "arch = arm # Platform architecture", \
- ])
- Traceback (most recent call last):
- ...
- TestError: Duplicated crossystem key: arch
- """
- pattern = "^([^ =]*) *= *(.*[^ ]) *# [^#]*$"
- parsed_list = {}
- for line in lines:
- matched = re.match(pattern, line.strip())
- if not matched:
- raise error.TestError("Failed to parse crossystem output: %s"
- % line)
- (name, value) = (matched.group(1), matched.group(2))
- if name in parsed_list:
- raise error.TestError("Duplicated crossystem key: %s" % name)
- parsed_list[name] = value
- return parsed_list
-
-
- def crossystem_checker(self, expected_dict):
- """Check the crossystem values matched.
-
- Given an expect_dict which describes the expected crossystem values,
- this function check the current crossystem values are matched or not.
-
- Args:
- expected_dict: A dict which contains the expected values.
-
- Returns:
- True if the crossystem value matched; otherwise, False.
- """
- lines = self.faft_client.run_shell_command_get_output('crossystem')
- got_dict = self._parse_crossystem_output(lines)
- for key in expected_dict:
- if key not in got_dict:
- logging.info('Expected key "%s" not in crossystem result' % key)
- return False
- if isinstance(expected_dict[key], str):
- if got_dict[key] != expected_dict[key]:
- logging.info("Expected '%s' value '%s' but got '%s'" %
- (key, expected_dict[key], got_dict[key]))
- return False
- elif isinstance(expected_dict[key], tuple):
- # Expected value is a tuple of possible actual values.
- if got_dict[key] not in expected_dict[key]:
- logging.info("Expected '%s' values %s but got '%s'" %
- (key, str(expected_dict[key]), got_dict[key]))
- return False
- else:
- logging.info("The expected_dict is neither a str nor a dict.")
- return False
- return True
-
-
- def vdat_flags_checker(self, mask, value):
- """Check the flags from VbSharedData matched.
-
- This function checks the masked flags from VbSharedData using crossystem
- are matched the given value.
-
- Args:
- mask: A bitmask of flags to be matched.
- value: An expected value.
-
- Returns:
- True if the flags matched; otherwise, False.
- """
- lines = self.faft_client.run_shell_command_get_output(
- 'crossystem vdat_flags')
- vdat_flags = int(lines[0], 16)
- if vdat_flags & mask != value:
- logging.info("Expected vdat_flags 0x%x mask 0x%x but got 0x%x" %
- (value, mask, vdat_flags))
- return False
- return True
-
-
- def ro_normal_checker(self, expected_fw=None, twostop=False):
- """Check the current boot uses RO boot.
-
- Args:
- expected_fw: A string of expected firmware, 'A', 'B', or
- None if don't care.
- twostop: True to expect a TwoStop boot; False to expect a RO boot.
-
- Returns:
- True if the currect boot firmware matched and used RO boot;
- otherwise, False.
- """
- crossystem_dict = {'tried_fwb': '0'}
- if expected_fw:
- crossystem_dict['mainfw_act'] = expected_fw.upper()
- if self.check_ec_capability(suppress_warning=True):
- crossystem_dict['ecfw_act'] = ('RW' if twostop else 'RO')
-
- return (self.vdat_flags_checker(
- vboot.VDAT_FLAG_LF_USE_RO_NORMAL,
- 0 if twostop else vboot.VDAT_FLAG_LF_USE_RO_NORMAL) and
- self.crossystem_checker(crossystem_dict))
-
-
- def dev_boot_usb_checker(self, dev_boot_usb=True):
- """Check the current boot is from a developer USB (Ctrl-U trigger).
-
- Args:
- dev_boot_usb: True to expect an USB boot;
- False to expect an internal device boot.
-
- Returns:
- True if the currect boot device matched; otherwise, False.
- """
- return (self.crossystem_checker({'mainfw_type': 'developer'})
- and self.faft_client.is_removable_device_boot() == dev_boot_usb)
-
-
- def root_part_checker(self, expected_part):
- """Check the partition number of the root device matched.
-
- Args:
- expected_part: A string containing the number of the expected root
- partition.
-
- Returns:
- True if the currect root partition number matched; otherwise, False.
- """
- part = self.faft_client.get_root_part()[-1]
- if self.ROOTFS_MAP[expected_part] != part:
- logging.info("Expected root part %s but got %s" %
- (self.ROOTFS_MAP[expected_part], part))
- return False
- return True
-
-
- def ec_act_copy_checker(self, expected_copy):
- """Check the EC running firmware copy matches.
-
- Args:
- expected_copy: A string containing 'RO', 'A', or 'B' indicating
- the expected copy of EC running firmware.
-
- Returns:
- True if the current EC running copy matches; otherwise, False.
- """
- lines = self.faft_client.run_shell_command_get_output('ectool version')
- pattern = re.compile("Firmware copy: (.*)")
- for line in lines:
- matched = pattern.match(line)
- if matched and matched.group(1) == expected_copy:
- return True
- return False
-
-
def check_root_part_on_non_recovery(self, part):
"""Check the partition number of root device and on normal/dev boot.
@@ -730,8 +553,8 @@
True if the root device matched and on normal/dev boot;
otherwise, False.
"""
- return self.root_part_checker(part) and \
- self.crossystem_checker({
+ return self.checkers.root_part_checker(part) and \
+ self.checkers.crossystem_checker({
'mainfw_type': ('normal', 'developer'),
})
@@ -787,7 +610,7 @@
Args:
part: A string of kernel partition number or 'a'/'b'.
"""
- if not self.root_part_checker(part):
+ if not self.checkers.root_part_checker(part):
if self.faft_client.diff_kernel_a_b():
self.copy_kernel_and_rootfs(
from_part=self.OTHER_KERNEL_MAP[part],
@@ -967,14 +790,14 @@
tried_fwb: True if requested in tried_fwb=1; False if tried_fwb=0.
"""
if tried_fwb:
- if not self.crossystem_checker({'tried_fwb': '1'}):
+ if not self.checkers.crossystem_checker({'tried_fwb': '1'}):
logging.info(
'Firmware is not booted with tried_fwb. Reboot into it.')
self.run_faft_step({
'userspace_action': self.faft_client.set_try_fw_b,
})
else:
- if not self.crossystem_checker({'tried_fwb': '0'}):
+ if not self.checkers.crossystem_checker({'tried_fwb': '0'}):
logging.info(
'Firmware is booted with tried_fwb. Reboot to clear.')
self.run_faft_step({})
@@ -1074,10 +897,10 @@
})
if dev_mode:
if (not self.client_attr.keyboard_dev and
- not self.crossystem_checker({'devsw_cur': '1'})):
+ not self.checkers.crossystem_checker({'devsw_cur': '1'})):
logging.info('Dev switch is not on. Now switch it on.')
self.servo.enable_development_mode()
- if not self.crossystem_checker({'devsw_boot': '1',
+ if not self.checkers.crossystem_checker({'devsw_boot': '1',
'mainfw_type': 'developer'}):
logging.info('System is not in dev mode. Reboot into it.')
self.run_faft_step({
@@ -1089,10 +912,10 @@
})
else:
if (not self.client_attr.keyboard_dev and
- not self.crossystem_checker({'devsw_cur': '0'})):
+ not self.checkers.crossystem_checker({'devsw_cur': '0'})):
logging.info('Dev switch is not off. Now switch it off.')
self.servo.disable_development_mode()
- if not self.crossystem_checker({'devsw_boot': '0',
+ if not self.checkers.crossystem_checker({'devsw_boot': '0',
'mainfw_type': 'normal'}):
logging.info('System is not in normal mode. Reboot into it.')
self.run_faft_step({
@@ -1232,7 +1055,7 @@
values. The client ignore TPM rollback, so here forces it to recovery
mode.
"""
- is_dev = self.crossystem_checker({'devsw_boot': '1'})
+ is_dev = self.checkers.crossystem_checker({'devsw_boot': '1'})
if not is_dev:
self.enable_dev_mode_and_reboot()
time.sleep(self.SYNC_DELAY)