blob: b7982124557f4df43a2a651966062109f5998f4e [file] [log] [blame]
Scottfe06ed82015-11-05 17:15:01 -08001# Copyright 2015 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
Scott07a848f2016-01-12 15:04:52 -08005import re
Scottfe06ed82015-11-05 17:15:01 -08006import time
7
8from autotest_lib.client.common_lib import error
9
10class PDConsoleUtils(object):
11 """ Provides a set of methods common to USB PD FAFT tests
12
13 Each instance of this class is associated with a particular
14 servo UART console. USB PD tests will typically use the console
15 command 'pd' and its subcommands to control/monitor Type C PD
16 connections. The servo object used for UART operations is
17 passed in and stored when this object is created.
18
19 """
20
21 SRC_CONNECT = 'SRC_READY'
22 SNK_CONNECT = 'SNK_READY'
23 SRC_DISC = 'SRC_DISCONNECTED'
24 SNK_DISC = 'SNK_DISCONNECTED'
25
26 # dualrole input/ouput values
27 DUALROLE_QUERY_DELAY = 0.25
28 dual_index = {'on': 0, 'off': 1, 'snk': 2, 'src': 3}
29 dualrole_cmd = ['on', 'off', 'sink', 'source']
30 dualrole_resp = ['on', 'off', 'force sink', 'force source']
31
Scott07a848f2016-01-12 15:04:52 -080032 # Dictionary for 'pd 0/1 state' parsing
33 PD_STATE_DICT = {
34 'port': 'Port\s+([\w]+)',
35 'role': 'Role:\s+([\w]+-[\w]+)',
36 'pd_state': 'State:\s+([\w]+_[\w]+)',
37 'flags': 'Flags:\s+([\w]+)',
38 'polarity': '(CC\d)'
39 }
40
Scottfe06ed82015-11-05 17:15:01 -080041 def __init__(self, console):
42 """ Console can be either usbpd, ec, or plankton_ec UART
43 This object with then be used by the class which creates
44 the PDConsoleUtils class to send/receive commands to UART
45 """
46 # save console for UART access functions
47 self.console = console
48
49 def send_pd_command(self, cmd):
50 """Send command to PD console UART
51
52 @param cmd: pd command string
53 """
54 self.console.send_command(cmd)
55
Scott07a848f2016-01-12 15:04:52 -080056 def send_pd_command_get_output(self, cmd, regexp):
57 """Send command to PD console, wait for response
58
59 @param cmd: pd command string
60 @param regexp: regular expression for desired output
61 """
62 return self.console.send_command_get_output(cmd, regexp)
63
Scottfe06ed82015-11-05 17:15:01 -080064 def verify_pd_console(self):
65 """Verify that PD commands exist on UART console
66
67 Send 'help' command to UART console
68 @returns: True if 'pd' is found, False if not
69 """
70
71 l = self.console.send_command_get_output('help', ['(pd)\s+([\w]+)'])
72 if l[0][1] == 'pd':
73 return True
74 else:
75 return False
76
77 def execute_pd_state_cmd(self, port):
78 """Get PD state for specified channel
79
Scott07a848f2016-01-12 15:04:52 -080080 pd 0/1 state command gives produces 5 fields. The full response
81 line is captured and then parsed to extract each field to fill
82 the dict containing port, polarity, role, pd_state, and flags.
Scottfe06ed82015-11-05 17:15:01 -080083
84 @param port: Type C PD port 0 or 1
85
86 @returns: A dict with the 5 fields listed above
87 """
88 cmd = 'pd'
89 subcmd = 'state'
90 pd_cmd = cmd +" " + str(port) + " " + subcmd
Scott07a848f2016-01-12 15:04:52 -080091 # Two FW versions for this command, get full line.
92 m = self.send_pd_command_get_output(pd_cmd,
93 ['(Port.*) - (Role:.*)\r'])
Scottfe06ed82015-11-05 17:15:01 -080094
Scott07a848f2016-01-12 15:04:52 -080095 # Extract desired values from result string
Scottfe06ed82015-11-05 17:15:01 -080096 state_result = {}
Scott07a848f2016-01-12 15:04:52 -080097 for key, regexp in self.PD_STATE_DICT.iteritems():
98 value = re.search(regexp, m[0][0])
99 if value:
100 state_result[key] = value.group(1)
101 else:
102 raise error.TestFail('pd 0/1 state: %r value not found' % (key))
Scottfe06ed82015-11-05 17:15:01 -0800103
104 return state_result
105
106 def get_pd_state(self, port):
107 """Get the current PD state
108
109 @param port: Type C PD port 0/1
110 @returns: current pd state
111 """
112
113 pd_dict = self.execute_pd_state_cmd(port)
114 return pd_dict['pd_state']
115
116 def get_pd_port(self, port):
117 """Get the current PD port
118
119 @param port: Type C PD port 0/1
120 @returns: current pd state
121 """
122 pd_dict = self.execute_pd_state_cmd(port)
123 return pd_dict['port']
124
125 def get_pd_role(self, port):
126 """Get the current PD power role (source or sink)
127
128 @param port: Type C PD port 0/1
129 @returns: current pd state
130 """
131 pd_dict = self.execute_pd_state_cmd(port)
132 return pd_dict['role']
133
134 def get_pd_flags(self, port):
135 """Get the current PD flags
136
137 @param port: Type C PD port 0/1
138 @returns: current pd state
139 """
140 pd_dict = self.execute_pd_state_cmd(port)
141 return pd_dict['flags']
142
143 def get_pd_dualrole(self):
144 """Get the current PD dualrole setting
145
146 @returns: current PD dualrole setting
147 """
148 cmd = 'pd dualrole'
Scott07a848f2016-01-12 15:04:52 -0800149 dual_list = self.send_pd_command_get_output(cmd,
Scottfe06ed82015-11-05 17:15:01 -0800150 ['dual-role toggling:\s+([\w ]+)'])
151 return dual_list[0][1]
152
153 def set_pd_dualrole(self, value):
154 """Set pd dualrole
155
156 It can be set to either:
157 1. on
158 2. off
159 3. snk (force sink mode)
160 4. src (force source mode)
161 After setting, the current value is read to confirm that it
162 was set properly.
163
164 @param value: One of the 4 options listed
165 """
166 # Get string required for console command
167 dual_index = self.dual_index[value]
168 # Create console command
169 cmd = 'pd dualrole ' + self.dualrole_cmd[dual_index]
170 self.console.send_command(cmd)
171 time.sleep(self.DUALROLE_QUERY_DELAY)
172 # Get current setting to verify that command was successful
173 dual = self.get_pd_dualrole()
174 # If it doesn't match, then raise error
175 if dual != self.dualrole_resp[dual_index]:
176 raise error.TestFail("dualrole error: " +
177 self.dualrole_resp[dual_index] + " != "+dual)
178
179 def query_pd_connection(self):
180 """Determine if PD connection is present
181
182 Try the 'pd 0/1 state' command and see if it's in either
183 expected state of a conneciton. Record the port number
184 that has an active connection
185
186 @returns: dict with params port, connect, and state
187 """
188 status = {}
189 port = 0;
190 status['connect'] = False
191 status['port'] = port
192 state = self.get_pd_state(port)
193 # Check port 0 first
194 if state == self.SRC_CONNECT or state == self.SNK_CONNECT:
195 status['connect'] = True
196 status['role'] = state
197 else:
198 port = 1
199 status['port'] = port
200 state = self.get_pd_state(port)
201 # Check port 1
202 if state == self.SRC_CONNECT or state == self.SNK_CONNECT:
203 status['connect'] = True
204 status['role'] = state
205
206 return status
207
208