blob: f0fb2fe9f05e813b0f547b0536bc13fbb836e4a3 [file] [log] [blame]
Scottc98bfd72016-02-22 16:18:53 -08001# Copyright 2016 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import re
6import logging
7import time
8
9from autotest_lib.client.common_lib import error
10from autotest_lib.server.cros.servo import pd_console
11
12
13class PDDevice(object):
14 """Base clase for all PD devices
15
16 This class provides a set of APIs for expected Type C PD required actions
17 in TypeC FAFT tests. The base class is specific for Type C devices that
18 do not have any console access.
19
20 """
21
22 def is_src(self):
23 """Checks if the port is connected as a source
24
25 """
26 raise NotImplementedError(
27 'is_src should be implemented in derived class')
28
29 def is_snk(self):
30 """Checks if the port is connected as a sink
31
32 @returns None
33 """
34 raise NotImplementedError(
35 'is_snk should be implemented in derived class')
36
37 def is_connected(self):
38 """Checks if the port is connected
39
40 @returns True if in a connected state, False otherwise
41 """
42 return self.is_src() or self.is_snk()
43
44 def is_disconnected(self):
45 """Checks if the port is disconnected
46
47 """
48 raise NotImplementedError(
49 'is_disconnected should be implemented in derived class')
50
51 def is_ufp(self):
52 """Checks if data role is UFP
53
54 """
55 raise NotImplementedError(
56 'is_ufp should be implemented in derived class')
57
58 def is_dfp(self):
59 """Checks if data role is DFP
60
61 """
62 raise NotImplementedError(
63 'is_dfp should be implemented in derived class')
64
65 def is_drp(self):
66 """Checks if dual role mode is supported
67
68 """
69 raise NotImplementedError(
70 'is_drp should be implemented in derived class')
71
72 def dr_swap(self):
73 """Attempts a data role swap
74
75 """
76 raise NotImplementedError(
77 'dr_swap should be implemented in derived class')
78
79 def pr_swap(self):
80 """Attempts a power role swap
81
82 """
83 raise NotImplementedError(
84 'pr_swap should be implemented in derived class')
85
86 def vbus_request(self, voltage):
87 """Requests a specific VBUS voltage from SRC
88
89 @param voltage: requested voltage level (5, 12, 20) in volts
90 """
91 raise NotImplementedError(
92 'vbus_request should be implemented in derived class')
93
Scottcae6fe42016-02-08 16:26:17 -080094 def soft_reset(self, states_list=None):
Scottc98bfd72016-02-22 16:18:53 -080095 """Initates a PD soft reset sequence
96
Scottcae6fe42016-02-08 16:26:17 -080097 @param states_list: list of expected PD state transitions
Scottc98bfd72016-02-22 16:18:53 -080098 """
99 raise NotImplementedError(
100 'drp_set should be implemented in derived class')
101
102 def hard_reset(self):
103 """Initates a PD hard reset sequence
104
105 """
106 raise NotImplementedError(
107 'drp_set should be implemented in derived class')
108
109 def drp_set(self, mode):
110 """Sets dualrole mode
111
112 @param mode: desired dual role setting (on, off, snk, src)
113 """
114 raise NotImplementedError(
115 'drp_set should be implemented in derived class')
116
117 def drp_disconnect_connect(self, disc_time_sec):
118 """Force PD disconnect/connect via drp settings
119
120 @param disc_time_sec: Time in seconds between disconnect and reconnect
121 """
122 raise NotImplementedError(
123 'drp_disconnect_reconnect should be implemented in \
124 derived class')
125
126 def cc_disconnect_connect(self, disc_time_sec):
127 """Force PD disconnect/connect using Plankton fw command
128
129 @param disc_time_sec: Time in seconds between disconnect and reconnect
130 """
131 raise NotImplementedError(
132 'cc_disconnect_reconnect should be implemented in \
133 derived class')
134
135
136class PDConsoleDevice(PDDevice):
137 """Class for PD devices that have console access
138
139 This class contains methods for common PD actions for any PD device which
140 has UART console access. It inherits the PD device base class. In addition,
141 it stores both the UART console and port for the PD device.
142 """
143
144 def __init__(self, console, port):
145 """Initialization method
146
147 @param console: UART console object
148 @param port: USB PD port number
149 """
150 # Save UART console
151 self.console = console
152 # Instantiate PD utilities used by methods in this class
153 self.utils = pd_console.PDConsoleUtils(console)
154 # Save the PD port number for this device
155 self.port = port
156 # Not a Plankton device
157 self.is_plankton = False
158
159 def is_src(self):
160 """Checks if the port is connected as a source
161
162 @returns True if connected as SRC, False otherwise
163 """
164 state = self.utils.get_pd_state(self.port)
165 return bool(state == self.utils.SRC_CONNECT)
166
167 def is_snk(self):
168 """Checks if the port is connected as a sink
169
170 @returns True if connected as SNK, False otherwise
171 """
172 state = self.utils.get_pd_state(self.port)
173 return bool(state == self.utils.SNK_CONNECT)
174
175 def is_connected(self):
176 """Checks if the port is connected
177
178 @returns True if in a connected state, False otherwise
179 """
180 state = self.utils.get_pd_state(self.port)
181 return bool(state == self.utils.SNK_CONNECT or
182 state == self.utils.SRC_CONNECT)
183
184 def is_disconnected(self):
185 """Checks if the port is disconnected
186
187 @returns True if in a disconnected state, False otherwise
188 """
189 state = self.utils.get_pd_state(self.port)
190 return bool(state == self.utils.SRC_DISC or
191 state == self.utils.SNK_DISC)
192
193 def is_drp(self):
194 """Checks if dual role mode is supported
195
196 @returns True if dual role mode is 'on', False otherwise
197 """
198 return self.utils.is_pd_dual_role_enabled()
199
200 def drp_disconnect_connect(self, disc_time_sec):
201 """Disconnect/reconnect using drp mode settings
202
203 A PD console device doesn't have an explicit connect/disconnect
204 command. Instead, the dualrole mode setting is used to force
205 disconnects in devices which support this feature. To disconnect,
206 force the dualrole mode to be the opposite role of the current
207 connected state.
208
209 @param disc_time_sec: time in seconds to wait to reconnect
210
211 @returns True if device disconnects, then returns to a connected
212 state. False if either step fails.
213 """
214 # Dualrole mode must be supported
215 if self.is_drp() is False:
216 logging.warn('Device not DRP capable, unabled to force disconnect')
217 return False
218 # Force state will be the opposite of current connect state
219 if self.is_src():
220 drp_mode = 'snk'
221 else:
222 drp_mode = 'src'
223 # Force disconnect
224 self.drp_set(drp_mode)
225 # Wait for disconnect time
226 time.sleep(disc_time_sec)
227 # Verify that the device is disconnected
228 disconnect = self.is_disconnected()
229 # Restore default dualrole mode
230 self.drp_set('on')
231 # Allow enough time for protocol state machine
232 time.sleep(self.utils.CONNECT_TIME)
233 # Check if connected
234 connect = self.is_connected()
235 logging.info('Disconnect = %r, Connect = %r', disconnect, connect)
236 return bool(disconnect and connect)
237
238 def drp_set(self, mode):
239 """Sets dualrole mode
240
241 @param mode: desired dual role setting (on, off, snk, src)
242
243 @returns True is set was successful, False otherwise
244 """
245 # Set desired dualrole mode
246 self.utils.set_pd_dualrole(mode)
247 # Get the expected output
248 resp = self.utils.dualrole_resp[self.utils.dual_index[mode]]
249 # Get current setting
250 current = self.utils.get_pd_dualrole()
251 # Verify that setting is correct
252 return bool(resp == current)
253
254 def try_src(self, enable):
255 """Enables/Disables Try.SRC PD protocol setting
256
257 @param enable: True to enable, False to disable
258
259 @returns True is setting was successful, False if feature not
260 supported by the device, or not set as desired.
261 """
262 # Create Try.SRC pd command
263 cmd = 'pd trysrc %d' % int(enable)
264 # Try.SRC on/off is output, if supported feature
265 regex = ['Try\.SRC\s([\w]+)|(Parameter)']
266 m = self.utils.send_pd_command_get_output(cmd, regex)
267 # Determine if Try.SRC feature is supported
268 trysrc = re.search('Try\.SRC\s([\w]+)', m[0][0])
269 if not trysrc:
270 logging.warn('Try.SRC not supported on this PD device')
271 return False
272 # TrySRC is supported on this PD device, verify setting.
273 logging.info('Try.SRC mode = %s', trysrc.group(1))
274 if enable:
275 val = 'on'
276 else:
277 val = 'off'
278 return bool(val == m[0][1])
279
Scottcae6fe42016-02-08 16:26:17 -0800280 def soft_reset(self, states_list=None):
281 """Initates a PD soft reset sequence
282
283 To verify that a soft reset sequence was initiated, the
284 reply message is checked to verify that the reset command
285 was acknowledged by its port pair. The connect state should
286 be same as it was prior to issuing the reset command.
287
288 @param states_list: list of expected PD state transitions
289
290 @returns True if the port pair acknowledges the the reset message
291 and if following the command, the device returns to the same
292 connected state. False otherwise.
293 """
294 RESET_DELAY = 0.5
295 cmd = 'pd %d soft' % self.port
296 state_before = self.utils.get_pd_state(self.port)
297 reply = self.utils.send_pd_command_get_reply_msg(cmd)
298 if reply != self.utils.PD_CONTROL_MSG_DICT['Accept']:
299 return False
300 time.sleep(RESET_DELAY)
301 state_after = self.utils.get_pd_state(self.port)
302 return state_before == state_after
303
304 def pr_swap(self):
305 """Attempts a power role swap
306
307 In order to attempt a power role swap the device must be
308 connected and support dualrole mode. Once these two criteria
309 are checked a power role command is issued. Following a delay
310 to allow for a reconnection the new power role is checked
311 against the power role prior to issuing the command.
312
313 @returns True if the device has swapped power roles, False otherwise.
314 """
315 # Get starting state
316 if not self.is_drp() and not self.drp_set('on'):
317 logging.warn('Dualrole Mode not enabled!')
318 return False
319 if self.is_connected() == False:
320 logging.warn('PD contract not established!')
321 return False
322 current_pr = self.utils.get_pd_state(self.port)
323 swap_cmd = 'pd %d swap power' % self.port
324 self.utils.send_pd_command(swap_cmd)
325 time.sleep(self.utils.CONNECT_TIME)
326 new_pr = self.utils.get_pd_state(self.port)
327 logging.info('Power swap: %s -> %s', current_pr, new_pr)
328 if self.is_connected() == False:
329 logging.warn('Device not connected following PR swap attempt.')
330 return False
331 return current_pr != new_pr
332
Scottc98bfd72016-02-22 16:18:53 -0800333
334class PDPlanktonDevice(PDConsoleDevice):
335 """Class for PD Plankton devices
336
337 This class contains methods for PD funtions which are unique to the
338 Plankton Type C factory testing board. It inherits all the methods
339 for PD console devices.
340 """
341
342 def __init__(self, console, port):
343 """Initialization method
344
345 @param console: UART console for this device
346 @param port: USB PD port number
347 """
348 # Instantiate the PD console object
349 super(PDPlanktonDevice, self).__init__(console, 0)
350 # Indicate this is Plankton device
351 self.is_plankton = True
352
353 def _toggle_plankton_drp(self):
354 """Issue 'usbc_action drp' Plankton command
355
356 @returns value of drp_enable in Plankton FW
357 """
358 drp_cmd = 'usbc_action drp'
359 drp_re = ['DRP\s=\s(\d)']
360 # Send DRP toggle command to Plankton and get value of 'drp_enable'
361 m = self.utils.send_pd_command_get_output(drp_cmd, drp_re)
362 return int(m[0][1])
363
364 def _enable_plankton_drp(self):
365 """Enable DRP mode on Plankton
366
367 DRP mode can only be toggled and is not able to be explicitly
368 enabled/disabled via the console. Therefore, this method will
369 toggle DRP mode until the console reply indicates that this
370 mode is enabled. The toggle happens a maximum of two times
371 in case this is called when it's already enabled.
372
373 @returns True when DRP mode is enabled, False if not successful
374 """
375 for attempt in xrange(2):
376 if self._toggle_plankton_drp() == True:
377 logging.info('Plankton DRP mode enabled')
378 return True
379 logging.error('Plankton DRP mode set failure')
380 return False
381
Scottcae6fe42016-02-08 16:26:17 -0800382 def _verify_state_sequence(self, states_list, console_log):
383 """Compare PD state transitions to expected values
384
385 @param states_list: list of expected PD state transitions
386 @param console_log: console output which contains state names
387 @returns True if the sequence matches, False otherwise
388 """
389 # For each state in the expected state transiton table, build
390 # the regexp and search for it in the state transition log.
391 for state in states_list:
392 state_regx = r'C{0}\s+[\w]+:\s({1})'.format(self.port,
393 state)
394 if re.search(state_regx, console_log) is None:
395 return False
396 return True
397
Scottc98bfd72016-02-22 16:18:53 -0800398 def cc_disconnect_connect(self, disc_time_sec):
399 """Disconnect/reconnect using Plankton
400
401 Plankton supports a feature which simulates a USB Type C disconnect
402 and reconnect.
403
404 @param disc_time_sec: Time in seconds for disconnect period.
405 """
406 DISC_DELAY = 100
407 disc_cmd = 'fake_disconnect %d %d' % (DISC_DELAY,
408 disc_time_sec * 1000)
409 self.utils.send_pd_command(disc_cmd)
410
411 def drp_set(self, mode):
412 """Sets dualrole mode
413
414 @param mode: desired dual role setting (on, off, snk, src)
415
416 @returns True if dualrole mode matches the requested value or
417 is successfully set to that value. False, otherwise.
418 """
419 # Get correct dualrole console response
420 resp = self.utils.dualrole_resp[self.utils.dual_index[mode]]
421 # Get current value of dualrole
422 drp = self.utils.get_pd_dualrole()
423 if drp == resp:
424 return True
425
426 if mode == 'on':
427 # Setting dpr_enable on Plankton will set dualrole mode to on
428 return self._enable_plankton_drp()
429 else:
430 # If desired setting is other than 'on', need to ensure that
431 # drp mode on Plankton is disabled.
432 if resp == 'on':
433 # This will turn off drp_enable flag and set dualmode to 'off'
434 return self._toggle_plankton_drp()
435 # With drp_enable flag off, can set to desired setting
436 return self.utils.set_pd_dualrole(mode)
437
Scottcae6fe42016-02-08 16:26:17 -0800438 def soft_reset(self, states_list):
439 """Initates a PD soft reset sequence
440
441 Plankton device has state names available on the console. When
442 a soft reset is issued the console log is extracted and then
443 compared against the expected state transisitons.
444
445 @param states_list: list of expected PD state transitions
446
447 @returns True if state transitions match, False otherwise
448 """
449 cmd = 'pd %d soft' % self.port
450 # Want to grab all output until either SRC_READY or SNK_READY
451 reply_exp = ['(.*)(C\d)\s+[\w]+:\s([\w]+_READY)']
452 m = self.utils.send_pd_command_get_output(cmd, reply_exp)
453 return self._verify_state_sequence(states_list, m[0][0])
454
Scottc98bfd72016-02-22 16:18:53 -0800455
456class PDPortPartner(object):
457 """Methods used to instantiate PD device objects
458
459 This class is initalized with a list of servo consoles. It
460 contains methods to determine if USB PD devices are accessible
461 via the consoles and attempts to determine USB PD port partners.
462 A PD device is USB PD port specific, a single console may access
463 multiple PD devices.
464
465 """
466
467 def __init__(self, consoles):
468 """Initialization method
469
470 @param consoles: list of servo consoles
471 """
472 self.consoles = consoles
473
474 def _send_pd_state(self, port, console):
475 """Tests if PD device exists on a given port number
476
477 @param port: USB PD port number to try
478 @param console: servo UART console
479
480 @returns True if 'pd <port> state' command gives a valid
481 response, False otherwise
482 """
483 cmd = 'pd %d state' % port
484 regex = r'(Port C\d)|(Parameter)'
485 m = console.send_command_get_output(cmd, [regex])
486 # If PD port exists, then output will be Port C0 or C1
487 regex = r'Port C{0}'.format(port)
488 if re.search(regex, m[0][0]):
489 return True
490 return False
491
492 def _find_num_pd_ports(self, console):
493 """Determine number of PD ports for a given console
494
495 @param console: uart console accssed via servo
496
497 @returns: number of PD ports accessible via console
498 """
499 MAX_PORTS = 2
500 num_ports = 0
501 for port in xrange(MAX_PORTS):
502 if self._send_pd_state(port, console):
503 num_ports += 1
504 return num_ports
505
506 def _is_pd_console(self, console):
507 """Check if pd option exists in console
508
509 @param console: uart console accssed via servo
510
511 @returns: True if 'pd' is found, False otherwise
512 """
513 try:
514 m = console.send_command_get_output('help', [r'(pd)\s+'])
515 return True
516 except error.TestFail:
517 return False
518
519 def _is_plankton_console(self, console):
520 """Check for Plankton console
521
522 This method looks for a console command option 'usbc_action' which
523 is unique to Plankton PD devices.
524
525 @param console: uart console accssed via servo
526
527 @returns True if usbc_action command is present, False otherwise
528 """
529 try:
530 m = console.send_command_get_output('help', [r'(usbc_action)'])
531 return True
532 except error.TestFail:
533 return False
534
535 def _check_port_pair(self, dev_pair):
536 """Check if two PD devices could be connected
537
538 If two USB PD devices are connected, then they should be in
539 either the SRC_READY or SNK_READY states and have opposite
540 power roles. In addition, they must be on different servo
541 consoles.
542
543 @param: list of two possible PD port parters
544
545 @returns True if not the same console and both PD devices
546 are a plausible pair based only on their PD states.
547 """
548 # Don't test if on the same servo console
549 if dev_pair[0].console == dev_pair[1].console:
550 logging.info('PD Devices are on same platform -> cant be a pair')
551 return False
552 # Must be SRC <--> SNK or SNK <--> SRC
553 return bool((dev_pair[0].is_src() and dev_pair[1].is_snk()) or
554 (dev_pair[0].is_snk() and dev_pair[1].is_src()))
555
556 def _verify_plankton_connection(self, dev_pair):
557 """Verify DUT to Plankton PD connection
558
559 This method checks for a Plankton PD connection for the
560 given port by first verifying if a PD connection is present.
561 If found, then it uses a Plankton feature to force a PD disconnect.
562 If the port is no longer in the connected state, and following
563 a delay, is found to be back in the connected state, then
564 a DUT pd to Plankton connection is verified.
565
566 @param dev_pair: list of two PD devices
567
568 @returns True if DUT to Plankton pd connection is verified
569 """
570 DISC_CHECK_TIME = .5
571 DISC_WAIT_TIME = 2
572 CONNECT_TIME = 4
573
574 if not self._check_port_pair(dev_pair):
575 return False
576
577 for index in xrange(len(dev_pair)):
578 try:
579 # Force PD disconnect
580 dev_pair[index].cc_disconnect_connect(DISC_WAIT_TIME)
581 time.sleep(DISC_CHECK_TIME)
582 # Verify that both devices are now disconnected
583 if (dev_pair[0].is_disconnected() and
584 dev_pair[1].is_disconnected()):
585 # Allow enough time for reconnection
586 time.sleep(DISC_WAIT_TIME + CONNECT_TIME)
587 if self._check_port_pair(dev_pair):
588 # Have verifed a pd disconnect/reconnect sequence
589 logging.info('Plankton <-> DUT pair found')
590 return True
591 else:
592 # Delay to allow
593 time.sleep(DISC_WAIT_TIME + CONNECT_TIME)
594 except NotImplementedError:
595 logging.info('dev %d is not Plankton', index)
596 return False
597
598 def identify_pd_devices(self):
599 """Instantiate PD devices present in test setup
600
601 @returns list of 2 PD devices if a DUT <-> Plankton found. If
602 not found, then returns an empty list.
603 """
604 devices = []
605 # For each possible uart console, check to see if a PD console
606 # is present and determine the number of PD ports.
607 for console in self.consoles:
608 if self._is_pd_console(console):
609 is_plank = self._is_plankton_console(console)
610 num_ports = self._find_num_pd_ports(console)
611 # For each PD port that can be accessed via the console,
612 # instantiate either PDConsole or PDPlankton device.
613 for port in xrange(num_ports):
614 if is_plank:
615 logging.info('Plankton PD Device on port %d', port)
616 devices.append(PDPlanktonDevice(console, port))
617 else:
618 devices.append(PDConsoleDevice(console, port))
619 logging.info('Console PD Device on port %d', port)
620
621 # Determine PD port partners in the list of PD devices. Note, that
622 # there can be PD devices which are not accessible via a uart console,
623 # but are connected to a PD port which is accessible.
624 test_pair = []
625 for deva in devices:
626 for dev_idx in range(devices.index(deva) + 1, len(devices)):
627 devb = devices[dev_idx]
628 pair = [deva, devb]
629 if self._verify_plankton_connection(pair):
630 test_pair = pair
631 devices.remove(deva)
632 devices.remove(devb)
633 return test_pair
634