| # Copyright 2015 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import re |
| import logging |
| import time |
| |
| from autotest_lib.client.common_lib import error |
| |
| class PDConsoleUtils(object): |
| """ Provides a set of methods common to USB PD FAFT tests |
| |
| Each instance of this class is associated with a particular |
| servo UART console. USB PD tests will typically use the console |
| command 'pd' and its subcommands to control/monitor Type C PD |
| connections. The servo object used for UART operations is |
| passed in and stored when this object is created. |
| |
| """ |
| |
| SRC_CONNECT = 'SRC_READY' |
| SNK_CONNECT = 'SNK_READY' |
| SRC_DISC = 'SRC_DISCONNECTED' |
| SNK_DISC = 'SNK_DISCONNECTED' |
| DRP_AUTO_TOGGLE = 'DRP_AUTO_TOGGLE' |
| PD_MAX_PORTS = 2 |
| CONNECT_TIME = 4 |
| |
| DUALROLE_QUERY_DELAY = 0.25 |
| # Dualrole input/ouput values of methods in this class. |
| DUALROLE_VALUES = ['on', 'off', 'snk', 'src'] |
| # Strings passing to the console command "pd dualrole" |
| DUALROLE_CMD_ARGS = ['on', 'off', 'sink', 'source'] |
| # Strings returned from the console command "pd dualrole" |
| DUALROLE_CMD_RESULTS = ['on', 'off', 'force sink', 'force source'] |
| |
| # Some old firmware uses a single dualrole setting for all ports; while |
| # some new firmware uses a per port dualrole settting. This flag will be |
| # initialized to True or False. |
| # TODO: Remove this flag when the old setting phases out |
| per_port_dualrole_setting = None |
| |
| # Dictionary for 'pd 0/1 state' parsing |
| PD_STATE_DICT = { |
| 'port': 'Port\s+([\w]+)', |
| 'role': 'Role:\s+([\w]+-[\w]+)', |
| 'pd_state': 'State:\s+([\w]+_[\w]+)', |
| 'flags': 'Flags:\s+([\w]+)', |
| 'polarity': '(CC\d)' |
| } |
| |
| # Dictionary for PD control message types |
| PD_CONTROL_MSG_MASK = 0x1f |
| PD_CONTROL_MSG_DICT = { |
| 'GoodCRC': 1, |
| 'GotoMin': 2, |
| 'Accept': 3, |
| 'Reject': 4, |
| 'Ping': 5, |
| 'PS_RDY': 6, |
| 'Get_Source_Cap': 7, |
| 'Get_Sink_Cap': 8, |
| 'DR_Swap': 9, |
| 'PR_Swap': 10, |
| 'VCONN_Swap': 11, |
| 'Wait': 12, |
| 'Soft_Reset': 13 |
| } |
| |
| # Dictionary for PD firmware state flags |
| PD_STATE_FLAGS_DICT = { |
| 'power_swap': 1 << 1, |
| 'data_swap': 1 << 2, |
| 'data_swap_active': 1 << 3, |
| 'vconn_on': 1 << 12 |
| } |
| |
| def __init__(self, console): |
| """Console can be either usbpd, ec, or pdtester UART |
| This object with then be used by the class which creates |
| the PDConsoleUtils class to send/receive commands to UART |
| """ |
| # save console for UART access functions |
| self.console = console |
| |
| def send_pd_command(self, cmd): |
| """Send command to PD console UART |
| |
| @param cmd: pd command string |
| """ |
| self.console.send_command(cmd) |
| |
| def send_pd_command_get_output(self, cmd, regexp): |
| """Send command to PD console, wait for response |
| |
| @param cmd: pd command string |
| @param regexp: regular expression for desired output |
| """ |
| # Enable PD console debug mode to show control messages |
| self.enable_pd_console_debug() |
| output = self.console.send_command_get_output(cmd, regexp) |
| self.disable_pd_console_debug() |
| return output |
| |
| def send_pd_command_get_reply_msg(self, cmd): |
| """Send PD protocol msg, get PD control msg reply |
| |
| The PD console debug mode is enabled prior to sending |
| a pd protocol message. This allows the |
| control message reply to be extracted. The debug mode |
| is disabled prior to exiting. |
| |
| @param cmd: pd command to issue to the UART console |
| |
| @returns: PD control header message |
| """ |
| m = self.send_pd_command_get_output(cmd, ['RECV\s([\w]+)\W']) |
| ctrl_msg = int(m[0][1], 16) & self.PD_CONTROL_MSG_MASK |
| return ctrl_msg |
| |
| def verify_pd_console(self): |
| """Verify that PD commands exist on UART console |
| |
| Send 'help' command to UART console |
| @returns: True if 'pd' is found, False if not |
| """ |
| |
| l = self.console.send_command_get_output('help', ['(pd)\s+([\w]+)']) |
| if l[0][1] == 'pd': |
| return True |
| else: |
| return False |
| |
| def execute_pd_state_cmd(self, port): |
| """Get PD state for specified channel |
| |
| pd 0/1 state command gives produces 5 fields. The full response |
| line is captured and then parsed to extract each field to fill |
| the dict containing port, polarity, role, pd_state, and flags. |
| |
| @param port: Type C PD port 0 or 1 |
| |
| @returns: A dict with the 5 fields listed above |
| """ |
| cmd = 'pd' |
| subcmd = 'state' |
| pd_cmd = cmd +" " + str(port) + " " + subcmd |
| # Two FW versions for this command, get full line. |
| m = self.send_pd_command_get_output(pd_cmd, |
| ['(Port.*) - (Role:.*)\n']) |
| |
| # Extract desired values from result string |
| state_result = {} |
| for key, regexp in self.PD_STATE_DICT.iteritems(): |
| value = re.search(regexp, m[0][0]) |
| if value: |
| state_result[key] = value.group(1) |
| else: |
| raise error.TestFail('pd 0/1 state: %r value not found' % (key)) |
| |
| return state_result |
| |
| def get_pd_state(self, port): |
| """Get the current PD state |
| |
| @param port: Type C PD port 0/1 |
| @returns: current pd state |
| """ |
| |
| pd_dict = self.execute_pd_state_cmd(port) |
| return pd_dict['pd_state'] |
| |
| def get_pd_port(self, port): |
| """Get the current PD port |
| |
| @param port: Type C PD port 0/1 |
| @returns: current pd state |
| """ |
| pd_dict = self.execute_pd_state_cmd(port) |
| return pd_dict['port'] |
| |
| def get_pd_role(self, port): |
| """Get the current PD power role (source or sink) |
| |
| @param port: Type C PD port 0/1 |
| @returns: current pd state |
| """ |
| pd_dict = self.execute_pd_state_cmd(port) |
| return pd_dict['role'] |
| |
| def get_pd_flags(self, port): |
| """Get the current PD flags |
| |
| @param port: Type C PD port 0/1 |
| @returns: current pd state |
| """ |
| pd_dict = self.execute_pd_state_cmd(port) |
| return pd_dict['flags'] |
| |
| def get_pd_dualrole(self, port): |
| """Get the current PD dualrole setting |
| |
| @param port: Type C PD port 0/1 |
| @returns: current PD dualrole setting, one of (on, off, snk, src) |
| """ |
| if self.per_port_dualrole_setting is True: |
| cmd = 'pd %d dualrole' % port |
| elif self.per_port_dualrole_setting is False: |
| cmd = 'pd dualrole' |
| else: |
| try: |
| logging.info('The per_port_dualrole_setting is unknown; ' |
| 'try the True case') |
| self.per_port_dualrole_setting = True |
| return self.get_pd_dualrole(port) |
| except: |
| logging.info('The per_port_dualrole_setting=True failed; ' |
| 'try the False case') |
| self.per_port_dualrole_setting = False |
| return self.get_pd_dualrole(port) |
| |
| m = self.send_pd_command_get_output(cmd, |
| ['dual-role toggling:\s+([\w ]+)[\r\n]']) |
| # Find the index according to the output of "pd dualrole" command |
| dual_index = self.DUALROLE_CMD_RESULTS.index(m[0][1]) |
| # Map to a string which is the output of this method |
| return self.DUALROLE_VALUES[dual_index] |
| |
| def set_pd_dualrole(self, port, value): |
| """Set pd dualrole |
| |
| It can be set to either: |
| 1. on |
| 2. off |
| 3. snk (force sink mode) |
| 4. src (force source mode) |
| After setting, the current value is read to confirm that it |
| was set properly. |
| |
| @param port: Type C PD port 0/1 |
| @param value: One of the 4 options listed |
| """ |
| # If the dualrole setting is not initialized, call the get method to |
| # initialize it. |
| if self.per_port_dualrole_setting is None: |
| self.get_pd_dualrole(port) |
| |
| # Get string required for console command |
| dual_index = self.DUALROLE_VALUES.index(value) |
| # Create console command |
| cmd = 'pd %d dualrole %s' % (port, self.DUALROLE_CMD_ARGS[dual_index]) |
| self.console.send_command(cmd) |
| time.sleep(self.DUALROLE_QUERY_DELAY) |
| # Get current setting to verify that command was successful |
| dual = self.get_pd_dualrole(port) |
| # If it doesn't match, then raise error |
| if dual != value: |
| raise error.TestFail("dualrole error: " + value + " != " + dual) |
| |
| def query_pd_connection(self): |
| """Determine if PD connection is present |
| |
| Try the 'pd 0/1 state' command and see if it's in either |
| expected state of a conneciton. Record the port number |
| that has an active connection |
| |
| @returns: dict with params port, connect, and state |
| """ |
| status = {} |
| port = 0; |
| status['connect'] = False |
| status['port'] = port |
| state = self.get_pd_state(port) |
| # Check port 0 first |
| if state == self.SRC_CONNECT or state == self.SNK_CONNECT: |
| status['connect'] = True |
| status['role'] = state |
| else: |
| port = 1 |
| status['port'] = port |
| state = self.get_pd_state(port) |
| # Check port 1 |
| if state == self.SRC_CONNECT or state == self.SNK_CONNECT: |
| status['connect'] = True |
| status['role'] = state |
| |
| return status |
| |
| def swap_power_role(self, port): |
| """Attempt a power role swap |
| |
| This method attempts to execute a power role swap. A check |
| is made to ensure that dualrole mode is enabled and that |
| a PD contract is currently established. If both checks pass, |
| then the power role swap command is issued. After a delay, |
| if a PD contract is established and the current state does |
| not equal the starting state, then it was successful. |
| |
| @param port: pd port number |
| |
| @returns: True if power swap is successful, False otherwise. |
| """ |
| # Get starting state |
| if self.is_pd_dual_role_enabled(port) == False: |
| logging.info('Dualrole Mode not enabled!') |
| return False |
| if self.is_pd_connected(port) == False: |
| logging.info('PD contract not established!') |
| return False |
| current_pr = self.get_pd_state(port) |
| swap_cmd = 'pd %d swap power' % port |
| self.send_pd_command(swap_cmd) |
| time.sleep(self.CONNECT_TIME) |
| new_pr = self.get_pd_state(port) |
| logging.info('Power swap: %s -> %s', current_pr, new_pr) |
| if self.is_pd_connected(port) == False: |
| return False |
| return bool(current_pr != new_pr) |
| |
| def disable_pd_console_debug(self): |
| """Turn off PD console debug |
| |
| """ |
| cmd = 'pd dump 0' |
| self.send_pd_command(cmd) |
| |
| def enable_pd_console_debug(self): |
| """Enable PD console debug level 1 |
| |
| """ |
| cmd = 'pd dump 2' |
| self.send_pd_command(cmd) |
| |
| def is_pd_flag_set(self, port, key): |
| """Test a bit in PD protocol state flags |
| |
| The flag word contains various PD protocol state information. |
| This method allows for a specific flag to be tested. |
| |
| @param port: Port which has the active PD connection |
| @param key: dict key to retrieve the flag bit mapping |
| |
| @returns True if the bit to be tested is set |
| """ |
| pd_flags = self.get_pd_flags(port) |
| return bool(self.PD_STATE_FLAGS_DICT[key] & int(pd_flags, 16)) |
| |
| def is_pd_connected(self, port): |
| """Check if a PD connection is active |
| |
| @param port: port to be used for pd console commands |
| |
| @returns True if port is in connected state |
| """ |
| state = self.get_pd_state(port) |
| return bool(state == self.SRC_CONNECT or state == self.SNK_CONNECT) |
| |
| def is_pd_dual_role_enabled(self, port): |
| """Check if a PD device is in dualrole mode |
| |
| @returns True is dualrole mode is active, false otherwise |
| """ |
| drp = self.get_pd_dualrole(port) |
| return bool(drp == 'on') |
| |
| |
| class PDConnectionUtils(PDConsoleUtils): |
| """Provides a set of methods common to USB PD FAFT tests |
| |
| This class is used for PD utility methods that require access |
| to both PDTester and DUT PD consoles. |
| |
| """ |
| |
| def __init__(self, dut_console, pdtester_console): |
| """ |
| @param dut_console: PD console object for DUT |
| @param pdtester_console: PD console object for PDTester |
| """ |
| # save console for DUT PD UART access functions |
| self.dut_console = dut_console |
| # save console for PDTester UART access functions |
| self.pdtester_console = pdtester_console |
| super(PDConnectionUtils, self).__init__(dut_console) |
| |
| def _verify_pdtester_connection(self, port): |
| """Verify DUT to PDTester PD connection |
| |
| This method checks for a PDTester PD connection for the |
| given port by first verifying if a PD connection is present. |
| If found, then it uses a PDTester feature to force a PD disconnect. |
| If the port is no longer in the connected state, and following |
| a delay, is found to be back in the connected state, then |
| a DUT pd to PDTester connection is verified. |
| |
| @param port: DUT pd port to test |
| |
| @returns True if DUT to PDTester pd connection is verified |
| """ |
| DISCONNECT_CHECK_TIME = 0.5 |
| DISCONNECT_TIME_SEC = 2 |
| # pdtester console command to force PD disconnect |
| disc_cmd = 'fakedisconnect 100 %d' % (DISCONNECT_TIME_SEC * 1000) |
| # Only check for PDTester if DUT has active PD connection |
| if self.dut_console.is_pd_connected(port): |
| # Attempt to force PD disconnection |
| self.pdtester_console.send_pd_command(disc_cmd) |
| time.sleep(DISCONNECT_CHECK_TIME) |
| # Verify that DUT PD port is no longer connected |
| if self.dut_console.is_pd_connected(port) == False: |
| # Wait for disconnect timer and give time to reconnect |
| time.sleep(self.dut_console.CONNECT_TIME + DISCONNECT_TIME_SEC) |
| if self.dut_console.is_pd_connected(port): |
| logging.info('PDTester connection verified on port %d', |
| port) |
| return True |
| else: |
| # Could have disconnected other port, allow it to reconnect |
| # before exiting. |
| time.sleep(self.dut_console.CONNECT_TIME + DISCONNECT_TIME_SEC) |
| return False |
| |
| def find_dut_to_pdtester_connection(self): |
| """Find the PD port which is connected to PDTester |
| |
| @returns DUT pd port number if found, None otherwise |
| """ |
| for port in xrange(self.dut_console.PD_MAX_PORTS): |
| # Check for DUT to PDTester connection on port |
| if self._verify_pdtester_connection(port): |
| # PDTester PD connection found so exit |
| return port |
| return None |