blob: 209eb87bffed833fd4cc7ba2fe285b483716f49f [file] [log] [blame]
Scottc98bfd72016-02-22 16:18:53 -08001# Copyright 2016 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import re
6import logging
7import time
8
9from autotest_lib.client.common_lib import error
10from autotest_lib.server.cros.servo import pd_console
11
12
13class PDDevice(object):
14 """Base clase for all PD devices
15
16 This class provides a set of APIs for expected Type C PD required actions
17 in TypeC FAFT tests. The base class is specific for Type C devices that
18 do not have any console access.
19
20 """
21
22 def is_src(self):
23 """Checks if the port is connected as a source
24
25 """
26 raise NotImplementedError(
27 'is_src should be implemented in derived class')
28
29 def is_snk(self):
30 """Checks if the port is connected as a sink
31
32 @returns None
33 """
34 raise NotImplementedError(
35 'is_snk should be implemented in derived class')
36
37 def is_connected(self):
38 """Checks if the port is connected
39
40 @returns True if in a connected state, False otherwise
41 """
42 return self.is_src() or self.is_snk()
43
44 def is_disconnected(self):
45 """Checks if the port is disconnected
46
47 """
48 raise NotImplementedError(
49 'is_disconnected should be implemented in derived class')
50
51 def is_ufp(self):
52 """Checks if data role is UFP
53
54 """
55 raise NotImplementedError(
56 'is_ufp should be implemented in derived class')
57
58 def is_dfp(self):
59 """Checks if data role is DFP
60
61 """
62 raise NotImplementedError(
63 'is_dfp should be implemented in derived class')
64
65 def is_drp(self):
66 """Checks if dual role mode is supported
67
68 """
69 raise NotImplementedError(
70 'is_drp should be implemented in derived class')
71
72 def dr_swap(self):
73 """Attempts a data role swap
74
75 """
76 raise NotImplementedError(
77 'dr_swap should be implemented in derived class')
78
79 def pr_swap(self):
80 """Attempts a power role swap
81
82 """
83 raise NotImplementedError(
84 'pr_swap should be implemented in derived class')
85
86 def vbus_request(self, voltage):
87 """Requests a specific VBUS voltage from SRC
88
89 @param voltage: requested voltage level (5, 12, 20) in volts
90 """
91 raise NotImplementedError(
92 'vbus_request should be implemented in derived class')
93
94 def soft_reset(self):
95 """Initates a PD soft reset sequence
96
97 """
98 raise NotImplementedError(
99 'drp_set should be implemented in derived class')
100
101 def hard_reset(self):
102 """Initates a PD hard reset sequence
103
104 """
105 raise NotImplementedError(
106 'drp_set should be implemented in derived class')
107
108 def drp_set(self, mode):
109 """Sets dualrole mode
110
111 @param mode: desired dual role setting (on, off, snk, src)
112 """
113 raise NotImplementedError(
114 'drp_set should be implemented in derived class')
115
116 def drp_disconnect_connect(self, disc_time_sec):
117 """Force PD disconnect/connect via drp settings
118
119 @param disc_time_sec: Time in seconds between disconnect and reconnect
120 """
121 raise NotImplementedError(
122 'drp_disconnect_reconnect should be implemented in \
123 derived class')
124
125 def cc_disconnect_connect(self, disc_time_sec):
126 """Force PD disconnect/connect using Plankton fw command
127
128 @param disc_time_sec: Time in seconds between disconnect and reconnect
129 """
130 raise NotImplementedError(
131 'cc_disconnect_reconnect should be implemented in \
132 derived class')
133
134
135class PDConsoleDevice(PDDevice):
136 """Class for PD devices that have console access
137
138 This class contains methods for common PD actions for any PD device which
139 has UART console access. It inherits the PD device base class. In addition,
140 it stores both the UART console and port for the PD device.
141 """
142
143 def __init__(self, console, port):
144 """Initialization method
145
146 @param console: UART console object
147 @param port: USB PD port number
148 """
149 # Save UART console
150 self.console = console
151 # Instantiate PD utilities used by methods in this class
152 self.utils = pd_console.PDConsoleUtils(console)
153 # Save the PD port number for this device
154 self.port = port
155 # Not a Plankton device
156 self.is_plankton = False
157
158 def is_src(self):
159 """Checks if the port is connected as a source
160
161 @returns True if connected as SRC, False otherwise
162 """
163 state = self.utils.get_pd_state(self.port)
164 return bool(state == self.utils.SRC_CONNECT)
165
166 def is_snk(self):
167 """Checks if the port is connected as a sink
168
169 @returns True if connected as SNK, False otherwise
170 """
171 state = self.utils.get_pd_state(self.port)
172 return bool(state == self.utils.SNK_CONNECT)
173
174 def is_connected(self):
175 """Checks if the port is connected
176
177 @returns True if in a connected state, False otherwise
178 """
179 state = self.utils.get_pd_state(self.port)
180 return bool(state == self.utils.SNK_CONNECT or
181 state == self.utils.SRC_CONNECT)
182
183 def is_disconnected(self):
184 """Checks if the port is disconnected
185
186 @returns True if in a disconnected state, False otherwise
187 """
188 state = self.utils.get_pd_state(self.port)
189 return bool(state == self.utils.SRC_DISC or
190 state == self.utils.SNK_DISC)
191
192 def is_drp(self):
193 """Checks if dual role mode is supported
194
195 @returns True if dual role mode is 'on', False otherwise
196 """
197 return self.utils.is_pd_dual_role_enabled()
198
199 def drp_disconnect_connect(self, disc_time_sec):
200 """Disconnect/reconnect using drp mode settings
201
202 A PD console device doesn't have an explicit connect/disconnect
203 command. Instead, the dualrole mode setting is used to force
204 disconnects in devices which support this feature. To disconnect,
205 force the dualrole mode to be the opposite role of the current
206 connected state.
207
208 @param disc_time_sec: time in seconds to wait to reconnect
209
210 @returns True if device disconnects, then returns to a connected
211 state. False if either step fails.
212 """
213 # Dualrole mode must be supported
214 if self.is_drp() is False:
215 logging.warn('Device not DRP capable, unabled to force disconnect')
216 return False
217 # Force state will be the opposite of current connect state
218 if self.is_src():
219 drp_mode = 'snk'
220 else:
221 drp_mode = 'src'
222 # Force disconnect
223 self.drp_set(drp_mode)
224 # Wait for disconnect time
225 time.sleep(disc_time_sec)
226 # Verify that the device is disconnected
227 disconnect = self.is_disconnected()
228 # Restore default dualrole mode
229 self.drp_set('on')
230 # Allow enough time for protocol state machine
231 time.sleep(self.utils.CONNECT_TIME)
232 # Check if connected
233 connect = self.is_connected()
234 logging.info('Disconnect = %r, Connect = %r', disconnect, connect)
235 return bool(disconnect and connect)
236
237 def drp_set(self, mode):
238 """Sets dualrole mode
239
240 @param mode: desired dual role setting (on, off, snk, src)
241
242 @returns True is set was successful, False otherwise
243 """
244 # Set desired dualrole mode
245 self.utils.set_pd_dualrole(mode)
246 # Get the expected output
247 resp = self.utils.dualrole_resp[self.utils.dual_index[mode]]
248 # Get current setting
249 current = self.utils.get_pd_dualrole()
250 # Verify that setting is correct
251 return bool(resp == current)
252
253 def try_src(self, enable):
254 """Enables/Disables Try.SRC PD protocol setting
255
256 @param enable: True to enable, False to disable
257
258 @returns True is setting was successful, False if feature not
259 supported by the device, or not set as desired.
260 """
261 # Create Try.SRC pd command
262 cmd = 'pd trysrc %d' % int(enable)
263 # Try.SRC on/off is output, if supported feature
264 regex = ['Try\.SRC\s([\w]+)|(Parameter)']
265 m = self.utils.send_pd_command_get_output(cmd, regex)
266 # Determine if Try.SRC feature is supported
267 trysrc = re.search('Try\.SRC\s([\w]+)', m[0][0])
268 if not trysrc:
269 logging.warn('Try.SRC not supported on this PD device')
270 return False
271 # TrySRC is supported on this PD device, verify setting.
272 logging.info('Try.SRC mode = %s', trysrc.group(1))
273 if enable:
274 val = 'on'
275 else:
276 val = 'off'
277 return bool(val == m[0][1])
278
279
280class PDPlanktonDevice(PDConsoleDevice):
281 """Class for PD Plankton devices
282
283 This class contains methods for PD funtions which are unique to the
284 Plankton Type C factory testing board. It inherits all the methods
285 for PD console devices.
286 """
287
288 def __init__(self, console, port):
289 """Initialization method
290
291 @param console: UART console for this device
292 @param port: USB PD port number
293 """
294 # Instantiate the PD console object
295 super(PDPlanktonDevice, self).__init__(console, 0)
296 # Indicate this is Plankton device
297 self.is_plankton = True
298
299 def _toggle_plankton_drp(self):
300 """Issue 'usbc_action drp' Plankton command
301
302 @returns value of drp_enable in Plankton FW
303 """
304 drp_cmd = 'usbc_action drp'
305 drp_re = ['DRP\s=\s(\d)']
306 # Send DRP toggle command to Plankton and get value of 'drp_enable'
307 m = self.utils.send_pd_command_get_output(drp_cmd, drp_re)
308 return int(m[0][1])
309
310 def _enable_plankton_drp(self):
311 """Enable DRP mode on Plankton
312
313 DRP mode can only be toggled and is not able to be explicitly
314 enabled/disabled via the console. Therefore, this method will
315 toggle DRP mode until the console reply indicates that this
316 mode is enabled. The toggle happens a maximum of two times
317 in case this is called when it's already enabled.
318
319 @returns True when DRP mode is enabled, False if not successful
320 """
321 for attempt in xrange(2):
322 if self._toggle_plankton_drp() == True:
323 logging.info('Plankton DRP mode enabled')
324 return True
325 logging.error('Plankton DRP mode set failure')
326 return False
327
328 def cc_disconnect_connect(self, disc_time_sec):
329 """Disconnect/reconnect using Plankton
330
331 Plankton supports a feature which simulates a USB Type C disconnect
332 and reconnect.
333
334 @param disc_time_sec: Time in seconds for disconnect period.
335 """
336 DISC_DELAY = 100
337 disc_cmd = 'fake_disconnect %d %d' % (DISC_DELAY,
338 disc_time_sec * 1000)
339 self.utils.send_pd_command(disc_cmd)
340
341 def drp_set(self, mode):
342 """Sets dualrole mode
343
344 @param mode: desired dual role setting (on, off, snk, src)
345
346 @returns True if dualrole mode matches the requested value or
347 is successfully set to that value. False, otherwise.
348 """
349 # Get correct dualrole console response
350 resp = self.utils.dualrole_resp[self.utils.dual_index[mode]]
351 # Get current value of dualrole
352 drp = self.utils.get_pd_dualrole()
353 if drp == resp:
354 return True
355
356 if mode == 'on':
357 # Setting dpr_enable on Plankton will set dualrole mode to on
358 return self._enable_plankton_drp()
359 else:
360 # If desired setting is other than 'on', need to ensure that
361 # drp mode on Plankton is disabled.
362 if resp == 'on':
363 # This will turn off drp_enable flag and set dualmode to 'off'
364 return self._toggle_plankton_drp()
365 # With drp_enable flag off, can set to desired setting
366 return self.utils.set_pd_dualrole(mode)
367
368
369class PDPortPartner(object):
370 """Methods used to instantiate PD device objects
371
372 This class is initalized with a list of servo consoles. It
373 contains methods to determine if USB PD devices are accessible
374 via the consoles and attempts to determine USB PD port partners.
375 A PD device is USB PD port specific, a single console may access
376 multiple PD devices.
377
378 """
379
380 def __init__(self, consoles):
381 """Initialization method
382
383 @param consoles: list of servo consoles
384 """
385 self.consoles = consoles
386
387 def _send_pd_state(self, port, console):
388 """Tests if PD device exists on a given port number
389
390 @param port: USB PD port number to try
391 @param console: servo UART console
392
393 @returns True if 'pd <port> state' command gives a valid
394 response, False otherwise
395 """
396 cmd = 'pd %d state' % port
397 regex = r'(Port C\d)|(Parameter)'
398 m = console.send_command_get_output(cmd, [regex])
399 # If PD port exists, then output will be Port C0 or C1
400 regex = r'Port C{0}'.format(port)
401 if re.search(regex, m[0][0]):
402 return True
403 return False
404
405 def _find_num_pd_ports(self, console):
406 """Determine number of PD ports for a given console
407
408 @param console: uart console accssed via servo
409
410 @returns: number of PD ports accessible via console
411 """
412 MAX_PORTS = 2
413 num_ports = 0
414 for port in xrange(MAX_PORTS):
415 if self._send_pd_state(port, console):
416 num_ports += 1
417 return num_ports
418
419 def _is_pd_console(self, console):
420 """Check if pd option exists in console
421
422 @param console: uart console accssed via servo
423
424 @returns: True if 'pd' is found, False otherwise
425 """
426 try:
427 m = console.send_command_get_output('help', [r'(pd)\s+'])
428 return True
429 except error.TestFail:
430 return False
431
432 def _is_plankton_console(self, console):
433 """Check for Plankton console
434
435 This method looks for a console command option 'usbc_action' which
436 is unique to Plankton PD devices.
437
438 @param console: uart console accssed via servo
439
440 @returns True if usbc_action command is present, False otherwise
441 """
442 try:
443 m = console.send_command_get_output('help', [r'(usbc_action)'])
444 return True
445 except error.TestFail:
446 return False
447
448 def _check_port_pair(self, dev_pair):
449 """Check if two PD devices could be connected
450
451 If two USB PD devices are connected, then they should be in
452 either the SRC_READY or SNK_READY states and have opposite
453 power roles. In addition, they must be on different servo
454 consoles.
455
456 @param: list of two possible PD port parters
457
458 @returns True if not the same console and both PD devices
459 are a plausible pair based only on their PD states.
460 """
461 # Don't test if on the same servo console
462 if dev_pair[0].console == dev_pair[1].console:
463 logging.info('PD Devices are on same platform -> cant be a pair')
464 return False
465 # Must be SRC <--> SNK or SNK <--> SRC
466 return bool((dev_pair[0].is_src() and dev_pair[1].is_snk()) or
467 (dev_pair[0].is_snk() and dev_pair[1].is_src()))
468
469 def _verify_plankton_connection(self, dev_pair):
470 """Verify DUT to Plankton PD connection
471
472 This method checks for a Plankton PD connection for the
473 given port by first verifying if a PD connection is present.
474 If found, then it uses a Plankton feature to force a PD disconnect.
475 If the port is no longer in the connected state, and following
476 a delay, is found to be back in the connected state, then
477 a DUT pd to Plankton connection is verified.
478
479 @param dev_pair: list of two PD devices
480
481 @returns True if DUT to Plankton pd connection is verified
482 """
483 DISC_CHECK_TIME = .5
484 DISC_WAIT_TIME = 2
485 CONNECT_TIME = 4
486
487 if not self._check_port_pair(dev_pair):
488 return False
489
490 for index in xrange(len(dev_pair)):
491 try:
492 # Force PD disconnect
493 dev_pair[index].cc_disconnect_connect(DISC_WAIT_TIME)
494 time.sleep(DISC_CHECK_TIME)
495 # Verify that both devices are now disconnected
496 if (dev_pair[0].is_disconnected() and
497 dev_pair[1].is_disconnected()):
498 # Allow enough time for reconnection
499 time.sleep(DISC_WAIT_TIME + CONNECT_TIME)
500 if self._check_port_pair(dev_pair):
501 # Have verifed a pd disconnect/reconnect sequence
502 logging.info('Plankton <-> DUT pair found')
503 return True
504 else:
505 # Delay to allow
506 time.sleep(DISC_WAIT_TIME + CONNECT_TIME)
507 except NotImplementedError:
508 logging.info('dev %d is not Plankton', index)
509 return False
510
511 def identify_pd_devices(self):
512 """Instantiate PD devices present in test setup
513
514 @returns list of 2 PD devices if a DUT <-> Plankton found. If
515 not found, then returns an empty list.
516 """
517 devices = []
518 # For each possible uart console, check to see if a PD console
519 # is present and determine the number of PD ports.
520 for console in self.consoles:
521 if self._is_pd_console(console):
522 is_plank = self._is_plankton_console(console)
523 num_ports = self._find_num_pd_ports(console)
524 # For each PD port that can be accessed via the console,
525 # instantiate either PDConsole or PDPlankton device.
526 for port in xrange(num_ports):
527 if is_plank:
528 logging.info('Plankton PD Device on port %d', port)
529 devices.append(PDPlanktonDevice(console, port))
530 else:
531 devices.append(PDConsoleDevice(console, port))
532 logging.info('Console PD Device on port %d', port)
533
534 # Determine PD port partners in the list of PD devices. Note, that
535 # there can be PD devices which are not accessible via a uart console,
536 # but are connected to a PD port which is accessible.
537 test_pair = []
538 for deva in devices:
539 for dev_idx in range(devices.index(deva) + 1, len(devices)):
540 devb = devices[dev_idx]
541 pair = [deva, devb]
542 if self._verify_plankton_connection(pair):
543 test_pair = pair
544 devices.remove(deva)
545 devices.remove(devb)
546 return test_pair
547