blob: 0a0aeca2c4a11295b373f4ef2b3a0bb09607cc2b [file] [log] [blame]
Scottfe06ed82015-11-05 17:15:01 -08001# Copyright 2015 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
Scott07a848f2016-01-12 15:04:52 -08005import re
Scott3d5b4782016-01-29 18:33:33 -08006import logging
Scottfe06ed82015-11-05 17:15:01 -08007import time
8
9from autotest_lib.client.common_lib import error
10
11class PDConsoleUtils(object):
12 """ Provides a set of methods common to USB PD FAFT tests
13
14 Each instance of this class is associated with a particular
15 servo UART console. USB PD tests will typically use the console
16 command 'pd' and its subcommands to control/monitor Type C PD
17 connections. The servo object used for UART operations is
18 passed in and stored when this object is created.
19
20 """
21
22 SRC_CONNECT = 'SRC_READY'
23 SNK_CONNECT = 'SNK_READY'
24 SRC_DISC = 'SRC_DISCONNECTED'
25 SNK_DISC = 'SNK_DISCONNECTED'
Scott370f9ef2016-01-20 17:34:29 -080026 PD_MAX_PORTS = 2
Scott3d5b4782016-01-29 18:33:33 -080027 CONNECT_TIME = 4
Scottfe06ed82015-11-05 17:15:01 -080028
29 # dualrole input/ouput values
30 DUALROLE_QUERY_DELAY = 0.25
31 dual_index = {'on': 0, 'off': 1, 'snk': 2, 'src': 3}
32 dualrole_cmd = ['on', 'off', 'sink', 'source']
33 dualrole_resp = ['on', 'off', 'force sink', 'force source']
34
Scott07a848f2016-01-12 15:04:52 -080035 # Dictionary for 'pd 0/1 state' parsing
36 PD_STATE_DICT = {
37 'port': 'Port\s+([\w]+)',
38 'role': 'Role:\s+([\w]+-[\w]+)',
39 'pd_state': 'State:\s+([\w]+_[\w]+)',
40 'flags': 'Flags:\s+([\w]+)',
41 'polarity': '(CC\d)'
42 }
43
Scott370f9ef2016-01-20 17:34:29 -080044 # Dictionary for PD control message types
45 PD_CONTROL_MSG_MASK = 0x1f
46 PD_CONTROL_MSG_DICT = {
47 'GoodCRC': 1,
48 'GotoMin': 2,
49 'Accept': 3,
50 'Reject': 4,
51 'Ping': 5,
52 'PS_RDY': 6,
53 'Get_Source_Cap': 7,
54 'Get_Sink_Cap': 8,
55 'DR_Swap': 9,
56 'PR_Swap': 10,
57 'VCONN_Swap': 11,
58 'Wait': 12,
59 'Soft_Reset': 13
60 }
61
62 # Dictionary for PD firmware state flags
63 PD_STATE_FLAGS_DICT = {
64 'power_swap': 1 << 1,
65 'data_swap': 1 << 2,
66 'data_swap_active': 1 << 3,
67 'vconn_on': 1 << 12
68 }
69
Scottfe06ed82015-11-05 17:15:01 -080070 def __init__(self, console):
71 """ Console can be either usbpd, ec, or plankton_ec UART
72 This object with then be used by the class which creates
73 the PDConsoleUtils class to send/receive commands to UART
74 """
75 # save console for UART access functions
76 self.console = console
77
78 def send_pd_command(self, cmd):
79 """Send command to PD console UART
80
81 @param cmd: pd command string
82 """
83 self.console.send_command(cmd)
84
Scott07a848f2016-01-12 15:04:52 -080085 def send_pd_command_get_output(self, cmd, regexp):
86 """Send command to PD console, wait for response
87
88 @param cmd: pd command string
89 @param regexp: regular expression for desired output
90 """
91 return self.console.send_command_get_output(cmd, regexp)
92
Scotte691c742016-02-08 15:35:30 -080093 def send_pd_command_get_reply_msg(self, cmd):
94 """Send PD protocol msg, get PD control msg reply
95
96 The PD console debug mode is enabled prior to sending
97 a pd protocol message. This allows the
98 control message reply to be extracted. The debug mode
99 is disabled prior to exiting.
100
101 @param cmd: pd command to issue to the UART console
102
103 @returns: PD control header message
104 """
105 # Enable PD console debug mode to show control messages
106 self.enable_pd_console_debug()
107 m = self.send_pd_command_get_output(cmd, ['RECV\s([\w]+)'])
108 ctrl_msg = int(m[0][1], 16) & self.PD_CONTROL_MSG_MASK
109 self.disable_pd_console_debug()
110 return ctrl_msg
111
Scottfe06ed82015-11-05 17:15:01 -0800112 def verify_pd_console(self):
113 """Verify that PD commands exist on UART console
114
115 Send 'help' command to UART console
116 @returns: True if 'pd' is found, False if not
117 """
118
119 l = self.console.send_command_get_output('help', ['(pd)\s+([\w]+)'])
120 if l[0][1] == 'pd':
121 return True
122 else:
123 return False
124
125 def execute_pd_state_cmd(self, port):
126 """Get PD state for specified channel
127
Scott07a848f2016-01-12 15:04:52 -0800128 pd 0/1 state command gives produces 5 fields. The full response
129 line is captured and then parsed to extract each field to fill
130 the dict containing port, polarity, role, pd_state, and flags.
Scottfe06ed82015-11-05 17:15:01 -0800131
132 @param port: Type C PD port 0 or 1
133
134 @returns: A dict with the 5 fields listed above
135 """
136 cmd = 'pd'
137 subcmd = 'state'
138 pd_cmd = cmd +" " + str(port) + " " + subcmd
Scott07a848f2016-01-12 15:04:52 -0800139 # Two FW versions for this command, get full line.
140 m = self.send_pd_command_get_output(pd_cmd,
141 ['(Port.*) - (Role:.*)\r'])
Scottfe06ed82015-11-05 17:15:01 -0800142
Scott07a848f2016-01-12 15:04:52 -0800143 # Extract desired values from result string
Scottfe06ed82015-11-05 17:15:01 -0800144 state_result = {}
Scott07a848f2016-01-12 15:04:52 -0800145 for key, regexp in self.PD_STATE_DICT.iteritems():
146 value = re.search(regexp, m[0][0])
147 if value:
148 state_result[key] = value.group(1)
149 else:
150 raise error.TestFail('pd 0/1 state: %r value not found' % (key))
Scottfe06ed82015-11-05 17:15:01 -0800151
152 return state_result
153
154 def get_pd_state(self, port):
155 """Get the current PD state
156
157 @param port: Type C PD port 0/1
158 @returns: current pd state
159 """
160
161 pd_dict = self.execute_pd_state_cmd(port)
162 return pd_dict['pd_state']
163
164 def get_pd_port(self, port):
165 """Get the current PD port
166
167 @param port: Type C PD port 0/1
168 @returns: current pd state
169 """
170 pd_dict = self.execute_pd_state_cmd(port)
171 return pd_dict['port']
172
173 def get_pd_role(self, port):
174 """Get the current PD power role (source or sink)
175
176 @param port: Type C PD port 0/1
177 @returns: current pd state
178 """
179 pd_dict = self.execute_pd_state_cmd(port)
180 return pd_dict['role']
181
182 def get_pd_flags(self, port):
183 """Get the current PD flags
184
185 @param port: Type C PD port 0/1
186 @returns: current pd state
187 """
188 pd_dict = self.execute_pd_state_cmd(port)
189 return pd_dict['flags']
190
191 def get_pd_dualrole(self):
192 """Get the current PD dualrole setting
193
194 @returns: current PD dualrole setting
195 """
196 cmd = 'pd dualrole'
Scott07a848f2016-01-12 15:04:52 -0800197 dual_list = self.send_pd_command_get_output(cmd,
Scottfe06ed82015-11-05 17:15:01 -0800198 ['dual-role toggling:\s+([\w ]+)'])
199 return dual_list[0][1]
200
201 def set_pd_dualrole(self, value):
202 """Set pd dualrole
203
204 It can be set to either:
205 1. on
206 2. off
207 3. snk (force sink mode)
208 4. src (force source mode)
209 After setting, the current value is read to confirm that it
210 was set properly.
211
212 @param value: One of the 4 options listed
213 """
214 # Get string required for console command
215 dual_index = self.dual_index[value]
216 # Create console command
217 cmd = 'pd dualrole ' + self.dualrole_cmd[dual_index]
218 self.console.send_command(cmd)
219 time.sleep(self.DUALROLE_QUERY_DELAY)
220 # Get current setting to verify that command was successful
221 dual = self.get_pd_dualrole()
222 # If it doesn't match, then raise error
223 if dual != self.dualrole_resp[dual_index]:
224 raise error.TestFail("dualrole error: " +
225 self.dualrole_resp[dual_index] + " != "+dual)
226
227 def query_pd_connection(self):
228 """Determine if PD connection is present
229
230 Try the 'pd 0/1 state' command and see if it's in either
231 expected state of a conneciton. Record the port number
232 that has an active connection
233
234 @returns: dict with params port, connect, and state
235 """
236 status = {}
237 port = 0;
238 status['connect'] = False
239 status['port'] = port
240 state = self.get_pd_state(port)
241 # Check port 0 first
242 if state == self.SRC_CONNECT or state == self.SNK_CONNECT:
243 status['connect'] = True
244 status['role'] = state
245 else:
246 port = 1
247 status['port'] = port
248 state = self.get_pd_state(port)
249 # Check port 1
250 if state == self.SRC_CONNECT or state == self.SNK_CONNECT:
251 status['connect'] = True
252 status['role'] = state
253
254 return status
255
Scotte691c742016-02-08 15:35:30 -0800256 def swap_power_role(self, port):
257 """Attempt a power role swap
258
259 This method attempts to execute a power role swap. A check
260 is made to ensure that dualrole mode is enabled and that
261 a PD contract is currently established. If both checks pass,
262 then the power role swap command is issued. After a delay,
263 if a PD contract is established and the current state does
264 not equal the starting state, then it was successful.
265
266 @param port: pd port number
267
268 @returns: True if power swap is successful, False otherwise.
269 """
270 # Get starting state
271 if self.is_pd_dual_role_enabled() == False:
272 logging.info('Dualrole Mode not enabled!')
273 return False
274 if self.is_pd_connected(port) == False:
275 logging.info('PD contract not established!')
276 return False
277 current_pr = self.get_pd_state(port)
278 swap_cmd = 'pd %d swap power' % port
279 self.send_pd_command(swap_cmd)
280 time.sleep(self.CONNECT_TIME)
281 new_pr = self.get_pd_state(port)
282 logging.info('Power swap: %s -> %s', current_pr, new_pr)
283 if self.is_pd_connected(port) == False:
284 return False
285 return bool(current_pr != new_pr)
286
Scott370f9ef2016-01-20 17:34:29 -0800287 def disable_pd_console_debug(self):
288 """Turn off PD console debug
289
290 """
291 cmd = 'pd dump 0'
292 self.send_pd_command(cmd)
293
294 def enable_pd_console_debug(self):
295 """Enable PD console debug level 1
296
297 """
298 cmd = 'pd dump 1'
299 self.send_pd_command(cmd)
300
301 def is_pd_flag_set(self, port, key):
302 """Test a bit in PD protocol state flags
303
304 The flag word contains various PD protocol state information.
305 This method allows for a specific flag to be tested.
306
307 @param port: Port which has the active PD connection
308 @param key: dict key to retrieve the flag bit mapping
309
310 @returns True if the bit to be tested is set
311 """
312 pd_flags = self.get_pd_flags(port)
313 return bool(self.PD_STATE_FLAGS_DICT[key] & int(pd_flags, 16))
314
315 def is_pd_connected(self, port):
316 """Check if a PD connection is active
317
318 @param port: port to be used for pd console commands
319
320 @returns True if port is in connected state
321 """
322 state = self.get_pd_state(port)
323 return bool(state == self.SRC_CONNECT or state == self.SNK_CONNECT)
Scottfe06ed82015-11-05 17:15:01 -0800324
Scott3d5b4782016-01-29 18:33:33 -0800325 def is_pd_dual_role_enabled(self):
326 """Check if a PD device is in dualrole mode
327
328 @returns True is dualrole mode is active, false otherwise
329 """
330 drp = self.get_pd_dualrole()
331 return bool(drp == self.dualrole_resp[self.dual_index['on']])
332
333
334class PDConnectionUtils(PDConsoleUtils):
335 """Provides a set of methods common to USB PD FAFT tests
336
337 This Class is used for PD utility methods that require access
338 to both Plankton and DUT PD consoles.
339
340 """
341
342 def __init__(self, dut_console, plankton_console):
343 """
344 @param dut_console: PD console object for DUT
345 @param plankton_console: PD console object for Plankton
346 """
347 # save console for DUT PD UART access functions
348 self.dut_console = dut_console
349 # save console for Plankton UART access functions
350 self.plankton_console = plankton_console
351 super(PDConnectionUtils, self).__init__(dut_console)
352
353 def _verify_plankton_connection(self, port):
354 """Verify DUT to Plankton PD connection
355
356 This method checks for a Plankton PD connection for the
357 given port by first verifying if a PD connection is present.
358 If found, then it uses a Plankton feature to force a PD disconnect.
359 If the port is no longer in the connected state, and following
360 a delay, is found to be back in the connected state, then
361 a DUT pd to Plankton connection is verified.
362
363 @param port: DUT pd port to test
364
365 @returns True if DUT to Plankton pd connection is verified
366 """
367 DISCONNECT_CHECK_TIME = 0.5
368 DISCONNECT_TIME_SEC = 2
369 # plankton console command to force PD disconnect
370 disc_cmd = 'fake_disconnect 100 %d' % (DISCONNECT_TIME_SEC * 1000)
371 # Only check for Plankton if DUT has active PD connection
372 if self.dut_console.is_pd_connected(port):
373 # Attempt to force PD disconnection
374 self.plankton_console.send_pd_command(disc_cmd)
375 time.sleep(DISCONNECT_CHECK_TIME)
376 # Verify that DUT PD port is no longer connected
377 if self.dut_console.is_pd_connected(port) == False:
378 # Wait for disconnect timer and give time to reconnect
379 time.sleep(self.dut_console.CONNECT_TIME + DISCONNECT_TIME_SEC)
380 if self.dut_console.is_pd_connected(port):
Scotte691c742016-02-08 15:35:30 -0800381 logging.info('Plankton connection verified on port %d',
382 port)
Scott3d5b4782016-01-29 18:33:33 -0800383 return True
384 else:
385 # Could have disconnected other port, allow it to reconnect
386 # before exiting.
387 time.sleep(self.dut_console.CONNECT_TIME + DISCONNECT_TIME_SEC)
388 return False
389
390 def find_dut_to_plankton_connection(self):
391 """Find the PD port which is connected to Plankton
392
393 @returns DUT pd port number if found, None otherwise
394 """
395 for port in xrange(self.dut_console.PD_MAX_PORTS):
396 # Check for DUT to Plankton connection on port
397 if self._verify_plankton_connection(port):
398 # Plankton PD connection found so exit
399 return port
400 return None