FAFT rework: remove having to provide FAFT steps in special dict object.
Continuing changes for FAFT rework: chromium:371185
1. Removed functions specific to handling of the faft step template object,
including the function that processed the invidual steps in that object.
2. Renamed cold_reboot and warm_reboot functions to better reflect their
purpose and to open up adding new reboot functions which can be used by
firmware tests.
3. Replaced occurances of functions which relied on the old faft steps object
with bare function calls.
4. Created a get_bootid function out of the code which was previously in the
step processing function.
BUG=chromium:371185
TEST=Ran at desk against a wolf device a modified version of
fimrware_FAFTSetup which uses the new scheme.
Change-Id: I8a48dd128c6b5b293b26db166de4d4732aa17c59
Reviewed-on: https://chromium-review.googlesource.com/198864
Reviewed-by: Wai-Hong Tam <waihong@chromium.org>
Tested-by: Yusuf Mohsinally <mohsinally@chromium.org>
Commit-Queue: Yusuf Mohsinally <mohsinally@chromium.org>
diff --git a/server/cros/faft/firmware_test.py b/server/cros/faft/firmware_test.py
index 1ad85f7..988fd69 100644
--- a/server/cros/faft/firmware_test.py
+++ b/server/cros/faft/firmware_test.py
@@ -83,54 +83,9 @@
class FirmwareTest(FAFTBase):
"""
- The base class of Fully Automated Firmware Test Sequence.
+ Base class that sets up helper objects/functions for firmware tests.
- Many firmware tests require several reboot cycles and verify the resulted
- system states. To do that, an Autotest test case should detailly handle
- every action on each step. It makes the test case hard to read and many
- duplicated code. The base class FirmwareTest is to solve this problem.
-
- The actions of one reboot cycle is defined in a dict, namely FAFT_STEP.
- There are four functions in the FAFT_STEP dict:
- state_checker: a function to check the current is valid or not,
- returning True if valid, otherwise, False to break the whole
- test sequence.
- userspace_action: a function to describe the action ran in userspace.
- reboot_action: a function to do reboot, default: sync_and_warm_reboot.
- firmware_action: a function to describe the action ran after reboot.
-
- And configurations:
- install_deps_after_boot: if True, install the Autotest dependency after
- boot; otherwise, do nothing. It is for the cases of recovery mode
- test. The test boots a USB/SD image instead of an internal image.
- The previous installed Autotest dependency on the internal image
- is lost. So need to install it again.
-
- The default FAFT_STEP checks nothing in state_checker and does nothing in
- userspace_action and firmware_action. Its reboot_action is a hardware
- reboot. You can change the default FAFT_STEP by calling
- self.register_faft_template(FAFT_STEP).
-
- A FAFT test case consists of several FAFT_STEP's, namely FAFT_SEQUENCE.
- FAFT_SEQUENCE is an array of FAFT_STEP's. Any missing fields on FAFT_STEP
- fall back to default.
-
- In the run_once(), it should register and run FAFT_SEQUENCE like:
- def run_once(self):
- self.register_faft_sequence(FAFT_SEQUENCE)
- self.run_faft_sequnce()
-
- Note that in the last step, we only run state_checker. The
- userspace_action, reboot_action, and firmware_action are not executed.
-
- Attributes:
- _faft_template: The default FAFT_STEP of each step. The actions would
- be over-written if the registered FAFT_SEQUENCE is valid.
- _faft_sequence: The registered FAFT_SEQUENCE.
- _install_image_path: The URL or the path on the host to the Chrome OS
- test image to be installed.
- _firmware_update: Boolean. True if firmware update needed after
- installing the image.
+ TODO: add documentaion as the FAFT rework progresses.
"""
version = 1
@@ -150,8 +105,6 @@
_HTTP_PREFIX = 'http://'
_DEVSERVER_PORT = '8090'
- _faft_template = {}
- _faft_sequence = ()
_install_image_path = None
_firmware_update = False
@@ -198,12 +151,6 @@
super(FirmwareTest, self).initialize(host)
self.run_id = str(uuid.uuid4())
logging.info('FirmwareTest initialize begin (id=%s)', self.run_id)
- self.register_faft_template({
- 'state_checker': (None),
- 'userspace_action': (None),
- 'reboot_action': (self.sync_and_warm_reboot),
- 'firmware_action': (None)
- })
# Parse arguments from command line
args = {}
self.power_control = host.POWER_CONTROL_RPM
@@ -251,8 +198,6 @@
def cleanup(self):
"""Autotest cleanup function."""
# Unset state checker in case it's set by subclass
- self.register_faft_template({'state_checker': None})
-
logging.info('FirmwareTest cleaning up (id=%s)', self.run_id)
try:
self.faft_client.system.is_available()
@@ -267,8 +212,6 @@
self.record_servo_log()
self.record_faft_client_log()
self.cleanup_uart_capture()
- self._faft_sequence = ()
- self._faft_template = {}
super(FirmwareTest, self).cleanup()
logging.info('FirmwareTest cleanup done (id=%s)', self.run_id)
@@ -326,7 +269,7 @@
"""
# DUT may halt on a firmware screen. Try cold reboot.
logging.info('Try cold reboot...')
- self.cold_reboot()
+ self.reboot_cold_trigger()
self.wait_for_client_offline()
self.wait_dev_screen_and_ctrl_d()
try:
@@ -405,10 +348,6 @@
self.record_uart_capture()
next_checker_matched = False
- if next_step is not None:
- next_test = {}
- next_test.update(self._faft_template)
- next_test.update(next_step)
# TODO(waihong@chromium.org): Implement replugging the Ethernet to
# identify if it is a network flaky.
@@ -647,7 +586,7 @@
}),
'userspace_action': (self.faft_client.system.run_shell_command,
install_cmd),
- 'reboot_action': self.cold_reboot,
+ 'reboot_action': self.reboot_cold_trigger,
'install_deps_after_boot': True,
},
{ # Step 3, expected normal or developer boot (not recovery)
@@ -701,9 +640,8 @@
self.faft_client.bios.reload()
# If changing FORCE_DEV_SWITCH_ON flag, reboot to get a clear state
if ((gbb_flags ^ new_flags) & vboot.GBB_FLAG_FORCE_DEV_SWITCH_ON):
- self.run_faft_step({
- 'firmware_action': self.wait_dev_screen_and_ctrl_d,
- })
+ self.reboot_warm_trigger()
+ self.wait_dev_screen_and_ctrl_d()
def clear_set_gbb_flags(self, clear_mask, set_mask):
"""Clear and set the GBB flags in the current flashrom.
@@ -802,9 +740,7 @@
self.copy_kernel_and_rootfs(
from_part=self.OTHER_KERNEL_MAP[part],
to_part=part)
- self.run_faft_step({
- 'userspace_action': (self.reset_and_prioritize_kernel, part),
- })
+ self.reset_and_prioritize_kernel(part)
def set_hardware_write_protect(self, enable):
"""Set hardware write protect pin.
@@ -872,10 +808,8 @@
if ec_wp != self._old_ec_wp:
logging.info('The test required EC is %swrite-protected. Reboot '
'and flip the state.', '' if ec_wp else 'not ')
- self.run_faft_step({
- 'reboot_action': (self.set_ec_write_protect_and_reboot, ec_wp),
- 'firmware_action': self.wait_dev_screen_and_ctrl_d,
- })
+ self.do_reboot_action(self.set_ec_write_protect_and_reboot, ec_wp)
+ self.wait_dev_screen_and_ctrl_d()
def restore_ec_write_protect(self):
"""Restore the original EC write-protection."""
@@ -883,12 +817,10 @@
return
if not self.checkers.crossystem_checker(
{'wpsw_boot': '1' if self._old_ec_wp else '0'}):
- logging.info('Restore the original EC write protection and reboot.')
- self.run_faft_step({
- 'reboot_action': (self.set_ec_write_protect_and_reboot,
- self._old_ec_wp),
- 'firmware_action': self.wait_dev_screen_and_ctrl_d,
- })
+ logging.info('Restore original EC write protection and reboot.')
+ self.do_reboot_action(self.set_ec_write_protect_and_reboot,
+ self._old_ec_wp)
+ self.wait_dev_screen_and_ctrl_d()
def press_ctrl_d(self, press_secs=''):
"""Send Ctrl-D key to DUT.
@@ -1105,14 +1037,11 @@
if not self.checkers.crossystem_checker({'tried_fwb': '1'}):
logging.info(
'Firmware is not booted with tried_fwb. Reboot into it.')
- self.run_faft_step({
- 'userspace_action': self.faft_client.system.set_try_fw_b,
- })
+ self.faft_client.system.set_try_fw_b()
else:
if not self.checkers.crossystem_checker({'tried_fwb': '0'}):
logging.info(
'Firmware is booted with tried_fwb. Reboot to clear.')
- self.run_faft_step({})
def power_on(self):
"""Switch DUT AC power on."""
@@ -1154,7 +1083,7 @@
self.servo.custom_recovery_mode()
else:
self.servo.enable_recovery_mode()
- self.cold_reboot()
+ self.reboot_cold_trigger()
time.sleep(self.faft_config.ec_boot_to_console)
self.servo.disable_recovery_mode()
@@ -1212,7 +1141,7 @@
# issue is resolved.
if self.faft_config.platform == 'Parrot':
self.wait_for_client_offline()
- self.cold_reboot()
+ self.reboot_cold_trigger()
def disable_keyboard_dev_mode(self):
"""Disable keyboard controlled developer mode"""
@@ -1220,7 +1149,7 @@
if (not self.faft_config.chrome_ec and
not self.faft_config.broken_rec_mode):
self.servo.disable_recovery_mode()
- self.cold_reboot()
+ self.reboot_cold_trigger()
self.wait_for_client_offline()
self.wait_fw_screen_and_switch_keyboard_dev_mode(dev=False)
@@ -1232,11 +1161,6 @@
@param dev_mode: True if requested in dev mode; False if normal mode.
"""
- # Change the default firmware_action for dev mode passing the fw screen.
- self.register_faft_template({
- 'firmware_action': (self.wait_dev_screen_and_ctrl_d if dev_mode
- else None),
- })
if dev_mode:
if (not self.faft_config.keyboard_dev and
not self.checkers.crossystem_checker({'devsw_cur': '1'})):
@@ -1247,13 +1171,11 @@
logging.info('System is not in dev mode. Reboot into it.')
if self._backup_dev_mode is None:
self._backup_dev_mode = False
- self.run_faft_step({
- 'userspace_action': None if self.faft_config.keyboard_dev
- else (self.faft_client.system.run_shell_command,
- 'chromeos-firmwareupdate --mode todev && reboot'),
- 'reboot_action': self.enable_keyboard_dev_mode if
- self.faft_config.keyboard_dev else None,
- })
+ if self.faft_config.keyboard_dev:
+ self.faft_client.system.run_shell_command(
+ 'chromeos-firmwareupdate --mode todev && reboot')
+ self.do_reboot_action(self.enable_keyboard_dev_mode)
+ # TOOD : Add a ctrl_d to speed through the dev screen.
else:
if (not self.faft_config.keyboard_dev and
not self.checkers.crossystem_checker({'devsw_cur': '0'})):
@@ -1264,13 +1186,10 @@
logging.info('System is not in normal mode. Reboot into it.')
if self._backup_dev_mode is None:
self._backup_dev_mode = True
- self.run_faft_step({
- 'userspace_action': None if self.faft_config.keyboard_dev
- else (self.faft_client.system.run_shell_command,
- 'chromeos-firmwareupdate --mode tonormal && reboot'),
- 'reboot_action': self.disable_keyboard_dev_mode if
- self.faft_config.keyboard_dev else None,
- })
+ if self.faft_config.keyboard_dev:
+ self.faft_client.system.run_shell_command(
+ 'chromeos-firmwareupdate --mode tonormal && reboot')
+ self.do_reboot_action(self.disable_keyboard_dev_mode)
def restore_dev_mode(self):
"""Restores original dev mode status if it has changed."""
@@ -1289,10 +1208,8 @@
flags = self.faft_client.bios.get_preamble_flags(section)
if flags & vboot.PREAMBLE_USE_RO_NORMAL:
flags = flags ^ vboot.PREAMBLE_USE_RO_NORMAL
- self.run_faft_step({
- 'userspace_action': (self.faft_client.bios.set_preamble_flags,
- (section, flags))
- })
+ self.faft_client.bios.set_preamble_flags(section, flags)
+ self.reboot_warm()
def setup_kernel(self, part):
"""Setup for kernel test.
@@ -1328,7 +1245,77 @@
self.faft_client.system.run_shell_command('cgpt prioritize -i%s %s' %
(self.KERNEL_MAP[part], root_dev))
- def warm_reboot(self):
+
+ ################################################
+ # Reboot APIs
+
+ def reboot_warm(self, sync_before_boot=True, wait_for_dut_up=True):
+ """
+ Perform a warm reboot.
+
+ This is the highest level function that most users will need.
+ It performs a sync, triggers a reboot and waits for kernel to boot.
+
+ @param sync_before_boot: bool, sync to disk before booting.
+ @param wait_for_dut_up: bool, wait for dut to boot before returning.
+ """
+ if sync_before_boot:
+ self.faft_client.system.run_shell_command('sync')
+ time.sleep(self.faft_config.sync)
+ self.reboot_warm_trigger()
+ if wait_for_dut_up:
+ self.wait_for_client_offline()
+ self.wait_for_kernel_up()
+
+ def reboot_cold(self, sync_before_boot=True, wait_for_dut_up=True):
+ """
+ Perform a cold reboot.
+
+ This is the highest level function that most users will need.
+ It performs a sync, triggers a reboot and waits for kernel to boot.
+
+ @param sync_before_boot: bool, sync to disk before booting.
+ @param wait_for_dut_up: bool, wait for dut to boot before returning.
+ """
+ if sync_before_boot:
+ self.faft_client.system.run_shell_command('sync')
+ time.sleep(self.faft_config.sync)
+ self.reboot_cold_trigger()
+ if wait_for_dut_up:
+ self.wait_for_client_offline()
+ self.wait_for_kernel_up()
+
+ def do_reboot_action(self, func):
+ """
+ Helper function that wraps the reboot function so that we check if the
+ DUT went down.
+
+ @param func: function to trigger the reboot.
+ """
+ logging.info("-[FAFT]-[ start do_reboot_action ]----------")
+ boot_id = self.get_bootid()
+ self._call_action(func)
+ self.wait_for_client_offline(orig_boot_id=boot_id)
+ logging.info("-[FAFT]-[ end do_reboot_action ]------------")
+
+ def wait_for_kernel_up(self, install_deps=False):
+ """
+ Helper function that waits for the device to boot up to kernel.
+
+ @param install_deps: bool, install deps after boot.
+ """
+ logging.info("-[FAFT]-[ start wait_for_kernel_up ]---")
+ try:
+ logging.info("Installing deps after boot : %s" % install_deps)
+ self.wait_for_client(install_deps=install_deps)
+ # Stop update-engine as it may change firmware/kernel.
+ self.stop_service('update-engine')
+ except ConnectionError:
+ logging.error('wait_for_client() timed out.')
+ self._restore_routine_from_timeout(next_step)
+ logging.info("-[FAFT]-[ end wait_for_kernel_up ]-----")
+
+ def reboot_warm_trigger(self):
"""Request a warm reboot.
A wrapper for underlying servo warm reset.
@@ -1336,11 +1323,11 @@
# Use cold reset if the warm reset is broken.
if self.faft_config.broken_warm_reset:
logging.info('broken_warm_reset is True. Cold rebooting instead.')
- self.cold_reboot()
+ self.reboot_cold_trigger()
else:
self.servo.get_power_state_controller().warm_reset()
- def cold_reboot(self):
+ def reboot_cold_trigger(self):
"""Request a cold reboot.
A wrapper for underlying servo cold reset.
@@ -1361,7 +1348,7 @@
"""
self.faft_client.system.run_shell_command('sync')
time.sleep(self.faft_config.sync)
- self.warm_reboot()
+ self.reboot_warm_trigger()
def sync_and_cold_reboot(self):
"""Request the client sync and do a cold reboot.
@@ -1370,7 +1357,7 @@
"""
self.faft_client.system.run_shell_command('sync')
time.sleep(self.faft_config.sync)
- self.cold_reboot()
+ self.reboot_cold_trigger()
def sync_and_ec_reboot(self, flags=''):
"""Request the client sync and do a EC triggered reboot.
@@ -1402,7 +1389,7 @@
self.enable_rec_mode_and_reboot()
self.wait_fw_screen_and_plug_usb()
time.sleep(self.faft_config.install_shim_done)
- self.warm_reboot()
+ self.reboot_warm_trigger()
def full_power_off_and_on(self):
"""Shutdown the device by pressing power button and power on again."""
@@ -1550,59 +1537,11 @@
if post_power_action:
self._call_action(post_power_action)
- def register_faft_template(self, template):
- """Register FAFT template, the default FAFT_STEP of each step.
-
- Any missing field falls back to the original faft_template.
-
- @param template: A FAFT_STEP dict.
+ def get_bootid(self, retry=3):
"""
- self._faft_template.update(template)
-
- def register_faft_sequence(self, sequence):
- """Register FAFT sequence.
-
- @param sequence: A FAFT_SEQUENCE array which consisted of FAFT_STEP
- dicts.
+ Return the bootid.
"""
- self._faft_sequence = sequence
-
- def run_faft_step(self, step, no_reboot=False, next_step=None):
- """Run a single FAFT step.
-
- Any missing field falls back to faft_template. An empty step means
- running the default faft_template.
-
- @param step: A FAFT_STEP dict of this step to run.
- @param next_step: A FAFT_STEP dict of next step.
- @param no_reboot: True to prevent running reboot_action and
- firmware_action.
- @parm next_step: Optional, a FAFT_STEP dict of the next step, which is
- used for diagnostic.
- @raise TestError: An error when the given step is not valid.
- @raise TestFail: Test failed in waiting DUT reboot.
- """
- FAFT_STEP_KEYS = ('state_checker', 'userspace_action', 'reboot_action',
- 'firmware_action', 'install_deps_after_boot')
-
- test = {}
- test.update(self._faft_template)
- test.update(step)
-
- for key in test:
- if key not in FAFT_STEP_KEYS:
- raise error.TestError('Invalid key in FAFT step: %s', key)
-
- # Record the UART output regularly.
- self.record_uart_capture()
-
- if test['state_checker']:
- logging.info("-[FAFT]-[ start stepstate_checker ]----------")
- self._call_action(test['state_checker'], check_status=True)
- logging.info("-[FAFT]-[ end state_checker ]----------------")
-
boot_id = None
- retry = 3
while retry:
try:
boot_id = self._client.get_boot_id()
@@ -1614,46 +1553,30 @@
else:
logging.warning('Failed to get boot_id.')
logging.info('boot_id: %s', boot_id)
+ return boot_id
- logging.info("-[FAFT]-[ start userspace_action ]----------")
- self._call_action(test['userspace_action'])
- logging.info("-[FAFT]-[ end userspace_action ]------------")
+ def check_state(self, func):
+ """
+ Wrapper around _call_action with check_status set to True. This is a
+ helper function to be used by tests and is currently implemented by
+ calling _call_action with check_status=True.
- # Don't run reboot_action and firmware_action if no_reboot is True.
- if not no_reboot:
- logging.info("-[FAFT]-[ start reboot_action ]----------")
- self._call_action(test['reboot_action'])
- logging.info("-[FAFT]-[ end reboot_action ]------------")
- self.wait_for_client_offline(orig_boot_id=boot_id)
- logging.info("-[FAFT]-[ start firmware_action ]----------")
- self._call_action(test['firmware_action'])
- logging.info("-[FAFT]-[ end firmware_action ]------------")
+ TODO: This function's arguments need to be made more stringent. And
+ its functionality should be moved over to check functions directly in
+ the future.
- try:
- if 'install_deps_after_boot' in test:
- logging.info("-[FAFT]-[ start install_deps_after_boot ]---")
- self.wait_for_client(
- install_deps=test['install_deps_after_boot'])
- logging.info("-[FAFT]-[ end install_deps_after_boot ]-----")
- else:
- self.wait_for_client()
- # Stop update-engine as it may change firmware/kernel.
- self.stop_service('update-engine')
- except ConnectionError:
- logging.error('wait_for_client() timed out.')
- self._restore_routine_from_timeout(next_step)
-
- def run_faft_sequence(self):
- """Run FAFT sequence which was previously registered."""
- sequence = self._faft_sequence
- for index, step in enumerate(sequence):
- logging.info('======== Running FAFT sequence step %d ========',
- index + 1)
- # Don't reboot in the last step.
- if index == len(sequence) - 1:
- self.run_faft_step(step, no_reboot=True)
- else:
- self.run_faft_step(step, next_step=sequence[index + 1])
+ @param func: A function, or a tuple (function, args, error_msg),
+ in which, args and error_msg are optional. args is
+ either a value or a tuple if multiple arguments.
+ This can also be a list containing multiple
+ function or tuple. In this case, these actions are
+ called in sequence.
+ @return: The result value of the action function.
+ @raise TestFail: If the function does notsucceed.
+ """
+ logging.info("-[FAFT]-[ start stepstate_checker ]----------")
+ self._call_action(func, check_status=True)
+ logging.info("-[FAFT]-[ end state_checker ]----------------")
def get_current_firmware_sha(self):
"""Get current firmware sha of body and vblock.