Tom Wai-Hong Tam | a70f0fe | 2011-09-02 18:28:47 +0800 | [diff] [blame^] | 1 | # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. |
| 2 | # Use of this source code is governed by a BSD-style license that can be |
| 3 | # found in the LICENSE file. |
| 4 | |
| 5 | import logging |
| 6 | import re |
| 7 | import time |
| 8 | import xmlrpclib |
| 9 | |
| 10 | from autotest_lib.client.common_lib import error |
| 11 | from autotest_lib.server.cros.servotest import ServoTest |
| 12 | |
| 13 | |
| 14 | class FAFTSequence(ServoTest): |
| 15 | """ |
| 16 | The base class of Fully Automated Firmware Test Sequence. |
| 17 | |
| 18 | Many firmware tests require several reboot cycles and verify the resulted |
| 19 | system states. To do that, an Autotest test case should detailly handle |
| 20 | every action on each step. It makes the test case hard to read and many |
| 21 | duplicated code. The base class FAFTSequence is to solve this problem. |
| 22 | |
| 23 | The actions of one reboot cycle is defined in a dict, namely FAFT_STEP. |
| 24 | There are four functions in the FAFT_STEP dict: |
| 25 | state_checker: a function to check the current is valid or not, |
| 26 | returning True if valid, otherwise, False to break the whole |
| 27 | test sequence. |
| 28 | userspace_action: a function to describe the action ran in userspace. |
| 29 | reboot_action: a function to do reboot, default: software_reboot. |
| 30 | firmware_action: a function to describe the action ran after reboot. |
| 31 | |
| 32 | The default FAFT_STEP checks nothing in state_checker and does nothing in |
| 33 | userspace_action and firmware_action. Its reboot_action is simply a |
| 34 | software reboot. You can change the default FAFT_STEP by calling |
| 35 | self.register_faft_template(FAFT_STEP). |
| 36 | |
| 37 | A FAFT test case consists of several FAFT_STEP's, namely FAFT_SEQUENCE. |
| 38 | FAFT_SEQUENCE is an array of FAFT_STEP's. Any missing fields on FAFT_STEP |
| 39 | fall back to default. |
| 40 | |
| 41 | In the run_once(), it should register and run FAFT_SEQUENCE like: |
| 42 | def run_once(self): |
| 43 | self.register_faft_sequence(FAFT_SEQUENCE) |
| 44 | self.run_faft_sequnce() |
| 45 | |
| 46 | Note that in the last step, we only run state_checker. The |
| 47 | userspace_action, reboot_action, and firmware_action are not executed. |
| 48 | |
| 49 | Attributes: |
| 50 | _faft_template: The default FAFT_STEP of each step. The actions would |
| 51 | be over-written if the registered FAFT_SEQUENCE is valid. |
| 52 | _faft_sequence: The registered FAFT_SEQUENCE. |
| 53 | """ |
| 54 | version = 1 |
| 55 | |
| 56 | _faft_template = None |
| 57 | _faft_sequence = () |
| 58 | |
| 59 | |
| 60 | def setup(self): |
| 61 | """Autotest setup function.""" |
| 62 | super(FAFTSequence, self).setup() |
| 63 | if not self._remote_infos['faft']['used']: |
| 64 | raise error.TestError('The use_faft flag should be enabled.') |
| 65 | self.register_faft_template({ |
| 66 | 'state_checker': (None), |
| 67 | 'userspace_action': (None), |
| 68 | 'reboot_action': (self.faft_client.software_reboot), |
| 69 | 'firmware_action': (None) |
| 70 | }) |
| 71 | |
| 72 | |
| 73 | def cleanup(self): |
| 74 | """Autotest cleanup function.""" |
| 75 | self._faft_sequence = () |
| 76 | self._faft_template = None |
| 77 | super(FAFTSequence, self).cleanup() |
| 78 | |
| 79 | |
| 80 | def _parse_crossystem_output(self, lines): |
| 81 | """Parse the crossystem output into a dict. |
| 82 | |
| 83 | Args: |
| 84 | lines: The list of crossystem output strings. |
| 85 | |
| 86 | Returns: |
| 87 | A dict which contains the crossystem keys/values. |
| 88 | |
| 89 | Raises: |
| 90 | error.TestError: If wrong format in crossystem output. |
| 91 | |
| 92 | >>> seq = FAFTSequence() |
| 93 | >>> seq._parse_crossystem_output([ \ |
| 94 | "arch = x86 # Platform architecture", \ |
| 95 | "cros_debug = 1 # OS should allow debug", \ |
| 96 | ]) |
| 97 | {'cros_debug': '1', 'arch': 'x86'} |
| 98 | >>> seq._parse_crossystem_output([ \ |
| 99 | "arch=x86", \ |
| 100 | ]) |
| 101 | Traceback (most recent call last): |
| 102 | ... |
| 103 | TestError: Failed to parse crossystem output: arch=x86 |
| 104 | >>> seq._parse_crossystem_output([ \ |
| 105 | "arch = x86 # Platform architecture", \ |
| 106 | "arch = arm # Platform architecture", \ |
| 107 | ]) |
| 108 | Traceback (most recent call last): |
| 109 | ... |
| 110 | TestError: Duplicated crossystem key: arch |
| 111 | """ |
| 112 | pattern = "^([^ =]*) *= *(.*[^ ]) *# [^#]*$" |
| 113 | parsed_list = {} |
| 114 | for line in lines: |
| 115 | matched = re.match(pattern, line.strip()) |
| 116 | if not matched: |
| 117 | raise error.TestError("Failed to parse crossystem output: %s" |
| 118 | % line) |
| 119 | (name, value) = (matched.group(1), matched.group(2)) |
| 120 | if name in parsed_list: |
| 121 | raise error.TestError("Duplicated crossystem key: %s" % name) |
| 122 | parsed_list[name] = value |
| 123 | return parsed_list |
| 124 | |
| 125 | |
| 126 | def crossystem_checker(self, expected_dict): |
| 127 | """Check the crossystem values matched. |
| 128 | |
| 129 | Given an expect_dict which describes the expected crossystem values, |
| 130 | this function check the current crossystem values are matched or not. |
| 131 | |
| 132 | Args: |
| 133 | expected_dict: A dict which contains the expected values. |
| 134 | |
| 135 | Returns: |
| 136 | True if the crossystem value matched; otherwise, False. |
| 137 | """ |
| 138 | lines = self.faft_client.run_shell_command_get_output('crossystem') |
| 139 | got_dict = self._parse_crossystem_output(lines) |
| 140 | for key in expected_dict: |
| 141 | if key not in got_dict: |
| 142 | logging.info('Expected key "%s" not in crossystem result' % key) |
| 143 | return False |
| 144 | if isinstance(expected_dict[key], str): |
| 145 | if got_dict[key] != expected_dict[key]: |
| 146 | logging.info("Expected '%s' value '%s' but got '%s'" % |
| 147 | (key, expected_dict[key], got_dict[key])) |
| 148 | return False |
| 149 | elif isinstance(expected_dict[key], tuple): |
| 150 | # Expected value is a tuple of possible actual values. |
| 151 | if got_dict[key] not in expected_dict[key]: |
| 152 | logging.info("Expected '%s' values %s but got '%s'" % |
| 153 | (key, str(expected_dict[key]), got_dict[key])) |
| 154 | return False |
| 155 | else: |
| 156 | logging.info("The expected_dict is neither a str nor a dict.") |
| 157 | return False |
| 158 | return True |
| 159 | |
| 160 | |
| 161 | def _str_action(self, action): |
| 162 | """Convert the action function into a readable string. |
| 163 | |
| 164 | The simple str() doesn't work on remote objects since we disable |
| 165 | allow_dotted_names flag when we launch the SimpleXMLRPCServer. |
| 166 | So this function handles the exception in this case. |
| 167 | |
| 168 | Args: |
| 169 | action: A function. |
| 170 | |
| 171 | Returns: |
| 172 | A readable string. |
| 173 | """ |
| 174 | try: |
| 175 | return str(action) |
| 176 | except xmlrpclib.Fault: |
| 177 | return '<remote method>' |
| 178 | |
| 179 | |
| 180 | def _call_action(self, action_tuple): |
| 181 | """Call the action function with/without arguments. |
| 182 | |
| 183 | Args: |
| 184 | action_tuple: A function, or a tuple which consisted of a function |
| 185 | and its arguments (if any). |
| 186 | |
| 187 | Returns: |
| 188 | The result value of the action function. |
| 189 | """ |
| 190 | if isinstance(action_tuple, tuple): |
| 191 | action = action_tuple[0] |
| 192 | args = action_tuple[1:] |
| 193 | if callable(action): |
| 194 | logging.info('calling %s with parameter %s' % ( |
| 195 | self._str_action(action), str(action_tuple[1]))) |
| 196 | return action(*args) |
| 197 | else: |
| 198 | logging.info('action is not callable!') |
| 199 | else: |
| 200 | action = action_tuple |
| 201 | if action is not None: |
| 202 | if callable(action): |
| 203 | logging.info('calling %s' % self._str_action(action)) |
| 204 | return action() |
| 205 | else: |
| 206 | logging.info('action is not callable!') |
| 207 | |
| 208 | return None |
| 209 | |
| 210 | |
| 211 | def register_faft_template(self, template): |
| 212 | """Register FAFT template, the default FAFT_STEP of each step. |
| 213 | |
| 214 | Args: |
| 215 | template: A FAFT_STEP dict. |
| 216 | """ |
| 217 | self._faft_template = template |
| 218 | |
| 219 | |
| 220 | def register_faft_sequence(self, sequence): |
| 221 | """Register FAFT sequence. |
| 222 | |
| 223 | Args: |
| 224 | sequence: A FAFT_SEQUENCE array which consisted of FAFT_STEP dicts. |
| 225 | """ |
| 226 | self._faft_sequence = sequence |
| 227 | |
| 228 | |
| 229 | def run_faft_sequence(self): |
| 230 | """Run FAFT sequence. |
| 231 | |
| 232 | Raises: |
| 233 | error.TestFail: An error when the test failed. |
| 234 | """ |
| 235 | default_test = self._faft_template |
| 236 | sequence = self._faft_sequence |
| 237 | |
| 238 | for test in sequence: |
| 239 | cur_test = {} |
| 240 | cur_test.update(default_test) |
| 241 | cur_test.update(test) |
| 242 | |
| 243 | self.wait_for_client() |
| 244 | if cur_test['state_checker']: |
| 245 | if not self._call_action(cur_test['state_checker']): |
| 246 | raise error.TestFail('State checker failed!') |
| 247 | |
| 248 | self._call_action(cur_test['userspace_action']) |
| 249 | |
| 250 | # Don't run reboot_action and firmware_action of the last step. |
| 251 | if test is not sequence[-1]: |
| 252 | self._call_action(cur_test['reboot_action']) |
| 253 | self.wait_for_client_offline() |
| 254 | self._call_action(cur_test['firmware_action']) |