blob: 6885d32a2cdf32698e784df25b1db49dffb929d9 [file] [log] [blame]
Scottfe06ed82015-11-05 17:15:01 -08001# Copyright 2015 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
Scott07a848f2016-01-12 15:04:52 -08005import re
Scott3d5b4782016-01-29 18:33:33 -08006import logging
Scottfe06ed82015-11-05 17:15:01 -08007import time
8
9from autotest_lib.client.common_lib import error
10
11class PDConsoleUtils(object):
12 """ Provides a set of methods common to USB PD FAFT tests
13
14 Each instance of this class is associated with a particular
15 servo UART console. USB PD tests will typically use the console
16 command 'pd' and its subcommands to control/monitor Type C PD
17 connections. The servo object used for UART operations is
18 passed in and stored when this object is created.
19
20 """
21
22 SRC_CONNECT = 'SRC_READY'
23 SNK_CONNECT = 'SNK_READY'
24 SRC_DISC = 'SRC_DISCONNECTED'
25 SNK_DISC = 'SNK_DISCONNECTED'
Scott370f9ef2016-01-20 17:34:29 -080026 PD_MAX_PORTS = 2
Scott3d5b4782016-01-29 18:33:33 -080027 CONNECT_TIME = 4
Scottfe06ed82015-11-05 17:15:01 -080028
29 # dualrole input/ouput values
30 DUALROLE_QUERY_DELAY = 0.25
31 dual_index = {'on': 0, 'off': 1, 'snk': 2, 'src': 3}
32 dualrole_cmd = ['on', 'off', 'sink', 'source']
33 dualrole_resp = ['on', 'off', 'force sink', 'force source']
34
Scott07a848f2016-01-12 15:04:52 -080035 # Dictionary for 'pd 0/1 state' parsing
36 PD_STATE_DICT = {
37 'port': 'Port\s+([\w]+)',
38 'role': 'Role:\s+([\w]+-[\w]+)',
39 'pd_state': 'State:\s+([\w]+_[\w]+)',
40 'flags': 'Flags:\s+([\w]+)',
41 'polarity': '(CC\d)'
42 }
43
Scott370f9ef2016-01-20 17:34:29 -080044 # Dictionary for PD control message types
45 PD_CONTROL_MSG_MASK = 0x1f
46 PD_CONTROL_MSG_DICT = {
47 'GoodCRC': 1,
48 'GotoMin': 2,
49 'Accept': 3,
50 'Reject': 4,
51 'Ping': 5,
52 'PS_RDY': 6,
53 'Get_Source_Cap': 7,
54 'Get_Sink_Cap': 8,
55 'DR_Swap': 9,
56 'PR_Swap': 10,
57 'VCONN_Swap': 11,
58 'Wait': 12,
59 'Soft_Reset': 13
60 }
61
62 # Dictionary for PD firmware state flags
63 PD_STATE_FLAGS_DICT = {
64 'power_swap': 1 << 1,
65 'data_swap': 1 << 2,
66 'data_swap_active': 1 << 3,
67 'vconn_on': 1 << 12
68 }
69
Scottfe06ed82015-11-05 17:15:01 -080070 def __init__(self, console):
71 """ Console can be either usbpd, ec, or plankton_ec UART
72 This object with then be used by the class which creates
73 the PDConsoleUtils class to send/receive commands to UART
74 """
75 # save console for UART access functions
76 self.console = console
77
78 def send_pd_command(self, cmd):
79 """Send command to PD console UART
80
81 @param cmd: pd command string
82 """
83 self.console.send_command(cmd)
84
Scott07a848f2016-01-12 15:04:52 -080085 def send_pd_command_get_output(self, cmd, regexp):
86 """Send command to PD console, wait for response
87
88 @param cmd: pd command string
89 @param regexp: regular expression for desired output
90 """
91 return self.console.send_command_get_output(cmd, regexp)
92
Scottfe06ed82015-11-05 17:15:01 -080093 def verify_pd_console(self):
94 """Verify that PD commands exist on UART console
95
96 Send 'help' command to UART console
97 @returns: True if 'pd' is found, False if not
98 """
99
100 l = self.console.send_command_get_output('help', ['(pd)\s+([\w]+)'])
101 if l[0][1] == 'pd':
102 return True
103 else:
104 return False
105
106 def execute_pd_state_cmd(self, port):
107 """Get PD state for specified channel
108
Scott07a848f2016-01-12 15:04:52 -0800109 pd 0/1 state command gives produces 5 fields. The full response
110 line is captured and then parsed to extract each field to fill
111 the dict containing port, polarity, role, pd_state, and flags.
Scottfe06ed82015-11-05 17:15:01 -0800112
113 @param port: Type C PD port 0 or 1
114
115 @returns: A dict with the 5 fields listed above
116 """
117 cmd = 'pd'
118 subcmd = 'state'
119 pd_cmd = cmd +" " + str(port) + " " + subcmd
Scott07a848f2016-01-12 15:04:52 -0800120 # Two FW versions for this command, get full line.
121 m = self.send_pd_command_get_output(pd_cmd,
122 ['(Port.*) - (Role:.*)\r'])
Scottfe06ed82015-11-05 17:15:01 -0800123
Scott07a848f2016-01-12 15:04:52 -0800124 # Extract desired values from result string
Scottfe06ed82015-11-05 17:15:01 -0800125 state_result = {}
Scott07a848f2016-01-12 15:04:52 -0800126 for key, regexp in self.PD_STATE_DICT.iteritems():
127 value = re.search(regexp, m[0][0])
128 if value:
129 state_result[key] = value.group(1)
130 else:
131 raise error.TestFail('pd 0/1 state: %r value not found' % (key))
Scottfe06ed82015-11-05 17:15:01 -0800132
133 return state_result
134
135 def get_pd_state(self, port):
136 """Get the current PD state
137
138 @param port: Type C PD port 0/1
139 @returns: current pd state
140 """
141
142 pd_dict = self.execute_pd_state_cmd(port)
143 return pd_dict['pd_state']
144
145 def get_pd_port(self, port):
146 """Get the current PD port
147
148 @param port: Type C PD port 0/1
149 @returns: current pd state
150 """
151 pd_dict = self.execute_pd_state_cmd(port)
152 return pd_dict['port']
153
154 def get_pd_role(self, port):
155 """Get the current PD power role (source or sink)
156
157 @param port: Type C PD port 0/1
158 @returns: current pd state
159 """
160 pd_dict = self.execute_pd_state_cmd(port)
161 return pd_dict['role']
162
163 def get_pd_flags(self, port):
164 """Get the current PD flags
165
166 @param port: Type C PD port 0/1
167 @returns: current pd state
168 """
169 pd_dict = self.execute_pd_state_cmd(port)
170 return pd_dict['flags']
171
172 def get_pd_dualrole(self):
173 """Get the current PD dualrole setting
174
175 @returns: current PD dualrole setting
176 """
177 cmd = 'pd dualrole'
Scott07a848f2016-01-12 15:04:52 -0800178 dual_list = self.send_pd_command_get_output(cmd,
Scottfe06ed82015-11-05 17:15:01 -0800179 ['dual-role toggling:\s+([\w ]+)'])
180 return dual_list[0][1]
181
182 def set_pd_dualrole(self, value):
183 """Set pd dualrole
184
185 It can be set to either:
186 1. on
187 2. off
188 3. snk (force sink mode)
189 4. src (force source mode)
190 After setting, the current value is read to confirm that it
191 was set properly.
192
193 @param value: One of the 4 options listed
194 """
195 # Get string required for console command
196 dual_index = self.dual_index[value]
197 # Create console command
198 cmd = 'pd dualrole ' + self.dualrole_cmd[dual_index]
199 self.console.send_command(cmd)
200 time.sleep(self.DUALROLE_QUERY_DELAY)
201 # Get current setting to verify that command was successful
202 dual = self.get_pd_dualrole()
203 # If it doesn't match, then raise error
204 if dual != self.dualrole_resp[dual_index]:
205 raise error.TestFail("dualrole error: " +
206 self.dualrole_resp[dual_index] + " != "+dual)
207
208 def query_pd_connection(self):
209 """Determine if PD connection is present
210
211 Try the 'pd 0/1 state' command and see if it's in either
212 expected state of a conneciton. Record the port number
213 that has an active connection
214
215 @returns: dict with params port, connect, and state
216 """
217 status = {}
218 port = 0;
219 status['connect'] = False
220 status['port'] = port
221 state = self.get_pd_state(port)
222 # Check port 0 first
223 if state == self.SRC_CONNECT or state == self.SNK_CONNECT:
224 status['connect'] = True
225 status['role'] = state
226 else:
227 port = 1
228 status['port'] = port
229 state = self.get_pd_state(port)
230 # Check port 1
231 if state == self.SRC_CONNECT or state == self.SNK_CONNECT:
232 status['connect'] = True
233 status['role'] = state
234
235 return status
236
Scott370f9ef2016-01-20 17:34:29 -0800237 def disable_pd_console_debug(self):
238 """Turn off PD console debug
239
240 """
241 cmd = 'pd dump 0'
242 self.send_pd_command(cmd)
243
244 def enable_pd_console_debug(self):
245 """Enable PD console debug level 1
246
247 """
248 cmd = 'pd dump 1'
249 self.send_pd_command(cmd)
250
251 def is_pd_flag_set(self, port, key):
252 """Test a bit in PD protocol state flags
253
254 The flag word contains various PD protocol state information.
255 This method allows for a specific flag to be tested.
256
257 @param port: Port which has the active PD connection
258 @param key: dict key to retrieve the flag bit mapping
259
260 @returns True if the bit to be tested is set
261 """
262 pd_flags = self.get_pd_flags(port)
263 return bool(self.PD_STATE_FLAGS_DICT[key] & int(pd_flags, 16))
264
265 def is_pd_connected(self, port):
266 """Check if a PD connection is active
267
268 @param port: port to be used for pd console commands
269
270 @returns True if port is in connected state
271 """
272 state = self.get_pd_state(port)
273 return bool(state == self.SRC_CONNECT or state == self.SNK_CONNECT)
Scottfe06ed82015-11-05 17:15:01 -0800274
Scott3d5b4782016-01-29 18:33:33 -0800275 def is_pd_dual_role_enabled(self):
276 """Check if a PD device is in dualrole mode
277
278 @returns True is dualrole mode is active, false otherwise
279 """
280 drp = self.get_pd_dualrole()
281 return bool(drp == self.dualrole_resp[self.dual_index['on']])
282
283
284class PDConnectionUtils(PDConsoleUtils):
285 """Provides a set of methods common to USB PD FAFT tests
286
287 This Class is used for PD utility methods that require access
288 to both Plankton and DUT PD consoles.
289
290 """
291
292 def __init__(self, dut_console, plankton_console):
293 """
294 @param dut_console: PD console object for DUT
295 @param plankton_console: PD console object for Plankton
296 """
297 # save console for DUT PD UART access functions
298 self.dut_console = dut_console
299 # save console for Plankton UART access functions
300 self.plankton_console = plankton_console
301 super(PDConnectionUtils, self).__init__(dut_console)
302
303 def _verify_plankton_connection(self, port):
304 """Verify DUT to Plankton PD connection
305
306 This method checks for a Plankton PD connection for the
307 given port by first verifying if a PD connection is present.
308 If found, then it uses a Plankton feature to force a PD disconnect.
309 If the port is no longer in the connected state, and following
310 a delay, is found to be back in the connected state, then
311 a DUT pd to Plankton connection is verified.
312
313 @param port: DUT pd port to test
314
315 @returns True if DUT to Plankton pd connection is verified
316 """
317 DISCONNECT_CHECK_TIME = 0.5
318 DISCONNECT_TIME_SEC = 2
319 # plankton console command to force PD disconnect
320 disc_cmd = 'fake_disconnect 100 %d' % (DISCONNECT_TIME_SEC * 1000)
321 # Only check for Plankton if DUT has active PD connection
322 if self.dut_console.is_pd_connected(port):
323 # Attempt to force PD disconnection
324 self.plankton_console.send_pd_command(disc_cmd)
325 time.sleep(DISCONNECT_CHECK_TIME)
326 # Verify that DUT PD port is no longer connected
327 if self.dut_console.is_pd_connected(port) == False:
328 # Wait for disconnect timer and give time to reconnect
329 time.sleep(self.dut_console.CONNECT_TIME + DISCONNECT_TIME_SEC)
330 if self.dut_console.is_pd_connected(port):
331 logging.info('Plankton connection verfied on port %d', port)
332 return True
333 else:
334 # Could have disconnected other port, allow it to reconnect
335 # before exiting.
336 time.sleep(self.dut_console.CONNECT_TIME + DISCONNECT_TIME_SEC)
337 return False
338
339 def find_dut_to_plankton_connection(self):
340 """Find the PD port which is connected to Plankton
341
342 @returns DUT pd port number if found, None otherwise
343 """
344 for port in xrange(self.dut_console.PD_MAX_PORTS):
345 # Check for DUT to Plankton connection on port
346 if self._verify_plankton_connection(port):
347 # Plankton PD connection found so exit
348 return port
349 return None