blob: 7f9d5c9be170db23ed9f0c6b776c553139c91678 [file] [log] [blame]
Scottc98bfd72016-02-22 16:18:53 -08001# Copyright 2016 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import re
6import logging
7import time
8
9from autotest_lib.client.common_lib import error
10from autotest_lib.server.cros.servo import pd_console
11
12
13class PDDevice(object):
14 """Base clase for all PD devices
15
16 This class provides a set of APIs for expected Type C PD required actions
17 in TypeC FAFT tests. The base class is specific for Type C devices that
18 do not have any console access.
19
20 """
21
22 def is_src(self):
23 """Checks if the port is connected as a source
24
25 """
26 raise NotImplementedError(
27 'is_src should be implemented in derived class')
28
29 def is_snk(self):
30 """Checks if the port is connected as a sink
31
32 @returns None
33 """
34 raise NotImplementedError(
35 'is_snk should be implemented in derived class')
36
37 def is_connected(self):
38 """Checks if the port is connected
39
40 @returns True if in a connected state, False otherwise
41 """
42 return self.is_src() or self.is_snk()
43
44 def is_disconnected(self):
45 """Checks if the port is disconnected
46
47 """
48 raise NotImplementedError(
49 'is_disconnected should be implemented in derived class')
50
51 def is_ufp(self):
52 """Checks if data role is UFP
53
54 """
55 raise NotImplementedError(
56 'is_ufp should be implemented in derived class')
57
58 def is_dfp(self):
59 """Checks if data role is DFP
60
61 """
62 raise NotImplementedError(
63 'is_dfp should be implemented in derived class')
64
65 def is_drp(self):
66 """Checks if dual role mode is supported
67
68 """
69 raise NotImplementedError(
70 'is_drp should be implemented in derived class')
71
72 def dr_swap(self):
73 """Attempts a data role swap
74
75 """
76 raise NotImplementedError(
77 'dr_swap should be implemented in derived class')
78
79 def pr_swap(self):
80 """Attempts a power role swap
81
82 """
83 raise NotImplementedError(
84 'pr_swap should be implemented in derived class')
85
86 def vbus_request(self, voltage):
87 """Requests a specific VBUS voltage from SRC
88
89 @param voltage: requested voltage level (5, 12, 20) in volts
90 """
91 raise NotImplementedError(
92 'vbus_request should be implemented in derived class')
93
Scottcae6fe42016-02-08 16:26:17 -080094 def soft_reset(self, states_list=None):
Scottc98bfd72016-02-22 16:18:53 -080095 """Initates a PD soft reset sequence
96
Scottcae6fe42016-02-08 16:26:17 -080097 @param states_list: list of expected PD state transitions
Scottc98bfd72016-02-22 16:18:53 -080098 """
99 raise NotImplementedError(
Scott4e194872016-02-08 16:26:17 -0800100 'soft_reset should be implemented in derived class')
Scottc98bfd72016-02-22 16:18:53 -0800101
102 def hard_reset(self):
103 """Initates a PD hard reset sequence
104
105 """
106 raise NotImplementedError(
Scott4e194872016-02-08 16:26:17 -0800107 'hard_reset should be implemented in derived class')
Scottc98bfd72016-02-22 16:18:53 -0800108
109 def drp_set(self, mode):
110 """Sets dualrole mode
111
112 @param mode: desired dual role setting (on, off, snk, src)
113 """
114 raise NotImplementedError(
115 'drp_set should be implemented in derived class')
116
117 def drp_disconnect_connect(self, disc_time_sec):
118 """Force PD disconnect/connect via drp settings
119
120 @param disc_time_sec: Time in seconds between disconnect and reconnect
121 """
122 raise NotImplementedError(
123 'drp_disconnect_reconnect should be implemented in \
124 derived class')
125
126 def cc_disconnect_connect(self, disc_time_sec):
127 """Force PD disconnect/connect using Plankton fw command
128
129 @param disc_time_sec: Time in seconds between disconnect and reconnect
130 """
131 raise NotImplementedError(
132 'cc_disconnect_reconnect should be implemented in \
133 derived class')
134
135
136class PDConsoleDevice(PDDevice):
137 """Class for PD devices that have console access
138
139 This class contains methods for common PD actions for any PD device which
140 has UART console access. It inherits the PD device base class. In addition,
141 it stores both the UART console and port for the PD device.
142 """
143
144 def __init__(self, console, port):
145 """Initialization method
146
147 @param console: UART console object
148 @param port: USB PD port number
149 """
150 # Save UART console
151 self.console = console
152 # Instantiate PD utilities used by methods in this class
153 self.utils = pd_console.PDConsoleUtils(console)
154 # Save the PD port number for this device
155 self.port = port
156 # Not a Plankton device
157 self.is_plankton = False
158
159 def is_src(self):
160 """Checks if the port is connected as a source
161
162 @returns True if connected as SRC, False otherwise
163 """
164 state = self.utils.get_pd_state(self.port)
165 return bool(state == self.utils.SRC_CONNECT)
166
167 def is_snk(self):
168 """Checks if the port is connected as a sink
169
170 @returns True if connected as SNK, False otherwise
171 """
172 state = self.utils.get_pd_state(self.port)
173 return bool(state == self.utils.SNK_CONNECT)
174
175 def is_connected(self):
176 """Checks if the port is connected
177
178 @returns True if in a connected state, False otherwise
179 """
180 state = self.utils.get_pd_state(self.port)
181 return bool(state == self.utils.SNK_CONNECT or
182 state == self.utils.SRC_CONNECT)
183
184 def is_disconnected(self):
185 """Checks if the port is disconnected
186
187 @returns True if in a disconnected state, False otherwise
188 """
189 state = self.utils.get_pd_state(self.port)
190 return bool(state == self.utils.SRC_DISC or
191 state == self.utils.SNK_DISC)
192
193 def is_drp(self):
194 """Checks if dual role mode is supported
195
196 @returns True if dual role mode is 'on', False otherwise
197 """
198 return self.utils.is_pd_dual_role_enabled()
199
200 def drp_disconnect_connect(self, disc_time_sec):
201 """Disconnect/reconnect using drp mode settings
202
203 A PD console device doesn't have an explicit connect/disconnect
204 command. Instead, the dualrole mode setting is used to force
205 disconnects in devices which support this feature. To disconnect,
206 force the dualrole mode to be the opposite role of the current
207 connected state.
208
209 @param disc_time_sec: time in seconds to wait to reconnect
210
211 @returns True if device disconnects, then returns to a connected
212 state. False if either step fails.
213 """
214 # Dualrole mode must be supported
215 if self.is_drp() is False:
216 logging.warn('Device not DRP capable, unabled to force disconnect')
217 return False
218 # Force state will be the opposite of current connect state
219 if self.is_src():
220 drp_mode = 'snk'
221 else:
222 drp_mode = 'src'
223 # Force disconnect
224 self.drp_set(drp_mode)
225 # Wait for disconnect time
226 time.sleep(disc_time_sec)
227 # Verify that the device is disconnected
228 disconnect = self.is_disconnected()
229 # Restore default dualrole mode
230 self.drp_set('on')
231 # Allow enough time for protocol state machine
232 time.sleep(self.utils.CONNECT_TIME)
233 # Check if connected
234 connect = self.is_connected()
235 logging.info('Disconnect = %r, Connect = %r', disconnect, connect)
236 return bool(disconnect and connect)
237
238 def drp_set(self, mode):
239 """Sets dualrole mode
240
241 @param mode: desired dual role setting (on, off, snk, src)
242
243 @returns True is set was successful, False otherwise
244 """
245 # Set desired dualrole mode
246 self.utils.set_pd_dualrole(mode)
247 # Get the expected output
248 resp = self.utils.dualrole_resp[self.utils.dual_index[mode]]
249 # Get current setting
250 current = self.utils.get_pd_dualrole()
251 # Verify that setting is correct
252 return bool(resp == current)
253
254 def try_src(self, enable):
255 """Enables/Disables Try.SRC PD protocol setting
256
257 @param enable: True to enable, False to disable
258
259 @returns True is setting was successful, False if feature not
260 supported by the device, or not set as desired.
261 """
262 # Create Try.SRC pd command
263 cmd = 'pd trysrc %d' % int(enable)
264 # Try.SRC on/off is output, if supported feature
265 regex = ['Try\.SRC\s([\w]+)|(Parameter)']
266 m = self.utils.send_pd_command_get_output(cmd, regex)
267 # Determine if Try.SRC feature is supported
268 trysrc = re.search('Try\.SRC\s([\w]+)', m[0][0])
269 if not trysrc:
270 logging.warn('Try.SRC not supported on this PD device')
271 return False
272 # TrySRC is supported on this PD device, verify setting.
273 logging.info('Try.SRC mode = %s', trysrc.group(1))
274 if enable:
275 val = 'on'
276 else:
277 val = 'off'
278 return bool(val == m[0][1])
279
Scottcae6fe42016-02-08 16:26:17 -0800280 def soft_reset(self, states_list=None):
281 """Initates a PD soft reset sequence
282
283 To verify that a soft reset sequence was initiated, the
284 reply message is checked to verify that the reset command
285 was acknowledged by its port pair. The connect state should
286 be same as it was prior to issuing the reset command.
287
288 @param states_list: list of expected PD state transitions
289
290 @returns True if the port pair acknowledges the the reset message
291 and if following the command, the device returns to the same
292 connected state. False otherwise.
293 """
294 RESET_DELAY = 0.5
295 cmd = 'pd %d soft' % self.port
296 state_before = self.utils.get_pd_state(self.port)
297 reply = self.utils.send_pd_command_get_reply_msg(cmd)
298 if reply != self.utils.PD_CONTROL_MSG_DICT['Accept']:
299 return False
300 time.sleep(RESET_DELAY)
301 state_after = self.utils.get_pd_state(self.port)
302 return state_before == state_after
303
Scott4e194872016-02-08 16:26:17 -0800304 def hard_reset(self):
305 """Initates a PD hard reset sequence
306
307 To verify that a hard reset sequence was initiated, the
308 console ouput is scanned for HARD RST TX. In addition, the connect
309 state should be same as it was prior to issuing the reset command.
310
311 @returns True if the port pair acknowledges that hard reset was
312 initiated and if following the command, the device returns to the same
313 connected state. False otherwise.
314 """
315 RESET_DELAY = 1.0
316 cmd = 'pd %d hard' % self.port
317 state_before = self.utils.get_pd_state(self.port)
318 self.utils.enable_pd_console_debug()
319 try:
320 self.utils.send_pd_command_get_output(cmd, ['.*(HARD\sRST\sTX)'])
321 except error.TestFail:
322 logging.warn('HARD RST TX not found')
323 return False
324 finally:
325 self.utils.disable_pd_console_debug()
326
327 time.sleep(RESET_DELAY)
328 state_after = self.utils.get_pd_state(self.port)
329 return state_before == state_after
330
Scottcae6fe42016-02-08 16:26:17 -0800331 def pr_swap(self):
332 """Attempts a power role swap
333
334 In order to attempt a power role swap the device must be
335 connected and support dualrole mode. Once these two criteria
336 are checked a power role command is issued. Following a delay
337 to allow for a reconnection the new power role is checked
338 against the power role prior to issuing the command.
339
340 @returns True if the device has swapped power roles, False otherwise.
341 """
342 # Get starting state
343 if not self.is_drp() and not self.drp_set('on'):
344 logging.warn('Dualrole Mode not enabled!')
345 return False
346 if self.is_connected() == False:
347 logging.warn('PD contract not established!')
348 return False
349 current_pr = self.utils.get_pd_state(self.port)
350 swap_cmd = 'pd %d swap power' % self.port
351 self.utils.send_pd_command(swap_cmd)
352 time.sleep(self.utils.CONNECT_TIME)
353 new_pr = self.utils.get_pd_state(self.port)
354 logging.info('Power swap: %s -> %s', current_pr, new_pr)
355 if self.is_connected() == False:
356 logging.warn('Device not connected following PR swap attempt.')
357 return False
358 return current_pr != new_pr
359
Scottc98bfd72016-02-22 16:18:53 -0800360
361class PDPlanktonDevice(PDConsoleDevice):
362 """Class for PD Plankton devices
363
364 This class contains methods for PD funtions which are unique to the
365 Plankton Type C factory testing board. It inherits all the methods
366 for PD console devices.
367 """
368
369 def __init__(self, console, port):
370 """Initialization method
371
372 @param console: UART console for this device
373 @param port: USB PD port number
374 """
375 # Instantiate the PD console object
376 super(PDPlanktonDevice, self).__init__(console, 0)
377 # Indicate this is Plankton device
378 self.is_plankton = True
379
380 def _toggle_plankton_drp(self):
381 """Issue 'usbc_action drp' Plankton command
382
383 @returns value of drp_enable in Plankton FW
384 """
385 drp_cmd = 'usbc_action drp'
386 drp_re = ['DRP\s=\s(\d)']
387 # Send DRP toggle command to Plankton and get value of 'drp_enable'
388 m = self.utils.send_pd_command_get_output(drp_cmd, drp_re)
389 return int(m[0][1])
390
391 def _enable_plankton_drp(self):
392 """Enable DRP mode on Plankton
393
394 DRP mode can only be toggled and is not able to be explicitly
395 enabled/disabled via the console. Therefore, this method will
396 toggle DRP mode until the console reply indicates that this
397 mode is enabled. The toggle happens a maximum of two times
398 in case this is called when it's already enabled.
399
400 @returns True when DRP mode is enabled, False if not successful
401 """
402 for attempt in xrange(2):
403 if self._toggle_plankton_drp() == True:
404 logging.info('Plankton DRP mode enabled')
405 return True
406 logging.error('Plankton DRP mode set failure')
407 return False
408
Scottcae6fe42016-02-08 16:26:17 -0800409 def _verify_state_sequence(self, states_list, console_log):
410 """Compare PD state transitions to expected values
411
412 @param states_list: list of expected PD state transitions
413 @param console_log: console output which contains state names
Scott4e194872016-02-08 16:26:17 -0800414
Scottcae6fe42016-02-08 16:26:17 -0800415 @returns True if the sequence matches, False otherwise
416 """
417 # For each state in the expected state transiton table, build
418 # the regexp and search for it in the state transition log.
419 for state in states_list:
420 state_regx = r'C{0}\s+[\w]+:\s({1})'.format(self.port,
421 state)
422 if re.search(state_regx, console_log) is None:
423 return False
424 return True
425
Scottc98bfd72016-02-22 16:18:53 -0800426 def cc_disconnect_connect(self, disc_time_sec):
427 """Disconnect/reconnect using Plankton
428
429 Plankton supports a feature which simulates a USB Type C disconnect
430 and reconnect.
431
432 @param disc_time_sec: Time in seconds for disconnect period.
433 """
434 DISC_DELAY = 100
435 disc_cmd = 'fake_disconnect %d %d' % (DISC_DELAY,
436 disc_time_sec * 1000)
437 self.utils.send_pd_command(disc_cmd)
438
439 def drp_set(self, mode):
440 """Sets dualrole mode
441
442 @param mode: desired dual role setting (on, off, snk, src)
443
444 @returns True if dualrole mode matches the requested value or
445 is successfully set to that value. False, otherwise.
446 """
447 # Get correct dualrole console response
448 resp = self.utils.dualrole_resp[self.utils.dual_index[mode]]
449 # Get current value of dualrole
450 drp = self.utils.get_pd_dualrole()
451 if drp == resp:
452 return True
453
454 if mode == 'on':
455 # Setting dpr_enable on Plankton will set dualrole mode to on
456 return self._enable_plankton_drp()
457 else:
458 # If desired setting is other than 'on', need to ensure that
459 # drp mode on Plankton is disabled.
460 if resp == 'on':
461 # This will turn off drp_enable flag and set dualmode to 'off'
462 return self._toggle_plankton_drp()
463 # With drp_enable flag off, can set to desired setting
464 return self.utils.set_pd_dualrole(mode)
465
Scott4e194872016-02-08 16:26:17 -0800466 def _reset(self, cmd, states_list):
467 """Initates a PD reset sequence
Scottcae6fe42016-02-08 16:26:17 -0800468
469 Plankton device has state names available on the console. When
470 a soft reset is issued the console log is extracted and then
471 compared against the expected state transisitons.
472
Scott4e194872016-02-08 16:26:17 -0800473 @param cmd: reset type (soft or hard)
474 @param states_list: list of expected PD state transitions
475
476 @returns True if state transitions match, False otherwise
477 """
478 # Want to grab all output until either SRC_READY or SNK_READY
479 reply_exp = ['(.*)(C\d)\s+[\w]+:\s([\w]+_READY)']
480 m = self.utils.send_pd_command_get_output(cmd, reply_exp)
481 return self._verify_state_sequence(states_list, m[0][0])
482
483 def soft_reset(self, states_list):
484 """Initates a PD soft reset sequence
485
Scottcae6fe42016-02-08 16:26:17 -0800486 @param states_list: list of expected PD state transitions
487
488 @returns True if state transitions match, False otherwise
489 """
490 cmd = 'pd %d soft' % self.port
Scott4e194872016-02-08 16:26:17 -0800491 return self._reset(cmd, states_list)
492
493 def hard_reset(self):
494 """Initates a PD hard reset sequence
495
496 @returns True if state transitions match, False otherwise
497 """
498 snk_reset_states = [
499 'HARD_RESET_SEND',
500 'HARD_RESET_EXECUTE',
501 'SNK_HARD_RESET_RECOVER',
502 'SNK_DISCOVERY',
503 'SNK_REQUESTED',
504 'SNK_TRANSITION',
505 'SNK_READY'
506 ]
507
508 src_reset_states = [
509 'HARD_RESET_SEND',
510 'HARD_RESET_EXECUTE',
511 'SRC_HARD_RESET_RECOVER',
512 'SRC_DISCOVERY',
513 'SRC_NEGOCIATE',
514 'SRC_ACCEPTED',
515 'SRC_POWERED',
516 'SRC_TRANSITION',
517 'SRC_READY'
518 ]
519
520 if self.is_src():
521 states_list = src_reset_states
522 elif self.is_snk():
523 states_list = snk_reset_states
524 else:
525 raise error.TestFail('Port Pair not in a connected state')
526
527 cmd = 'pd %d hard' % self.port
528 return self._reset(cmd, states_list)
Scottcae6fe42016-02-08 16:26:17 -0800529
Scottc98bfd72016-02-22 16:18:53 -0800530
531class PDPortPartner(object):
532 """Methods used to instantiate PD device objects
533
534 This class is initalized with a list of servo consoles. It
535 contains methods to determine if USB PD devices are accessible
536 via the consoles and attempts to determine USB PD port partners.
537 A PD device is USB PD port specific, a single console may access
538 multiple PD devices.
539
540 """
541
542 def __init__(self, consoles):
543 """Initialization method
544
545 @param consoles: list of servo consoles
546 """
547 self.consoles = consoles
548
549 def _send_pd_state(self, port, console):
550 """Tests if PD device exists on a given port number
551
552 @param port: USB PD port number to try
553 @param console: servo UART console
554
555 @returns True if 'pd <port> state' command gives a valid
556 response, False otherwise
557 """
558 cmd = 'pd %d state' % port
559 regex = r'(Port C\d)|(Parameter)'
560 m = console.send_command_get_output(cmd, [regex])
561 # If PD port exists, then output will be Port C0 or C1
562 regex = r'Port C{0}'.format(port)
563 if re.search(regex, m[0][0]):
564 return True
565 return False
566
567 def _find_num_pd_ports(self, console):
568 """Determine number of PD ports for a given console
569
570 @param console: uart console accssed via servo
571
572 @returns: number of PD ports accessible via console
573 """
574 MAX_PORTS = 2
575 num_ports = 0
576 for port in xrange(MAX_PORTS):
577 if self._send_pd_state(port, console):
578 num_ports += 1
579 return num_ports
580
581 def _is_pd_console(self, console):
582 """Check if pd option exists in console
583
584 @param console: uart console accssed via servo
585
586 @returns: True if 'pd' is found, False otherwise
587 """
588 try:
589 m = console.send_command_get_output('help', [r'(pd)\s+'])
590 return True
591 except error.TestFail:
592 return False
593
594 def _is_plankton_console(self, console):
595 """Check for Plankton console
596
597 This method looks for a console command option 'usbc_action' which
598 is unique to Plankton PD devices.
599
600 @param console: uart console accssed via servo
601
602 @returns True if usbc_action command is present, False otherwise
603 """
604 try:
605 m = console.send_command_get_output('help', [r'(usbc_action)'])
606 return True
607 except error.TestFail:
608 return False
609
610 def _check_port_pair(self, dev_pair):
611 """Check if two PD devices could be connected
612
613 If two USB PD devices are connected, then they should be in
614 either the SRC_READY or SNK_READY states and have opposite
615 power roles. In addition, they must be on different servo
616 consoles.
617
618 @param: list of two possible PD port parters
619
620 @returns True if not the same console and both PD devices
621 are a plausible pair based only on their PD states.
622 """
623 # Don't test if on the same servo console
624 if dev_pair[0].console == dev_pair[1].console:
625 logging.info('PD Devices are on same platform -> cant be a pair')
626 return False
627 # Must be SRC <--> SNK or SNK <--> SRC
628 return bool((dev_pair[0].is_src() and dev_pair[1].is_snk()) or
629 (dev_pair[0].is_snk() and dev_pair[1].is_src()))
630
631 def _verify_plankton_connection(self, dev_pair):
632 """Verify DUT to Plankton PD connection
633
634 This method checks for a Plankton PD connection for the
635 given port by first verifying if a PD connection is present.
636 If found, then it uses a Plankton feature to force a PD disconnect.
637 If the port is no longer in the connected state, and following
638 a delay, is found to be back in the connected state, then
639 a DUT pd to Plankton connection is verified.
640
641 @param dev_pair: list of two PD devices
642
643 @returns True if DUT to Plankton pd connection is verified
644 """
645 DISC_CHECK_TIME = .5
646 DISC_WAIT_TIME = 2
647 CONNECT_TIME = 4
648
649 if not self._check_port_pair(dev_pair):
650 return False
651
652 for index in xrange(len(dev_pair)):
653 try:
654 # Force PD disconnect
655 dev_pair[index].cc_disconnect_connect(DISC_WAIT_TIME)
656 time.sleep(DISC_CHECK_TIME)
657 # Verify that both devices are now disconnected
658 if (dev_pair[0].is_disconnected() and
659 dev_pair[1].is_disconnected()):
660 # Allow enough time for reconnection
661 time.sleep(DISC_WAIT_TIME + CONNECT_TIME)
662 if self._check_port_pair(dev_pair):
663 # Have verifed a pd disconnect/reconnect sequence
664 logging.info('Plankton <-> DUT pair found')
665 return True
666 else:
667 # Delay to allow
668 time.sleep(DISC_WAIT_TIME + CONNECT_TIME)
669 except NotImplementedError:
670 logging.info('dev %d is not Plankton', index)
671 return False
672
673 def identify_pd_devices(self):
674 """Instantiate PD devices present in test setup
675
676 @returns list of 2 PD devices if a DUT <-> Plankton found. If
677 not found, then returns an empty list.
678 """
679 devices = []
680 # For each possible uart console, check to see if a PD console
681 # is present and determine the number of PD ports.
682 for console in self.consoles:
683 if self._is_pd_console(console):
684 is_plank = self._is_plankton_console(console)
685 num_ports = self._find_num_pd_ports(console)
686 # For each PD port that can be accessed via the console,
687 # instantiate either PDConsole or PDPlankton device.
688 for port in xrange(num_ports):
689 if is_plank:
690 logging.info('Plankton PD Device on port %d', port)
691 devices.append(PDPlanktonDevice(console, port))
692 else:
693 devices.append(PDConsoleDevice(console, port))
694 logging.info('Console PD Device on port %d', port)
695
696 # Determine PD port partners in the list of PD devices. Note, that
697 # there can be PD devices which are not accessible via a uart console,
698 # but are connected to a PD port which is accessible.
699 test_pair = []
700 for deva in devices:
701 for dev_idx in range(devices.index(deva) + 1, len(devices)):
702 devb = devices[dev_idx]
703 pair = [deva, devb]
704 if self._verify_plankton_connection(pair):
705 test_pair = pair
706 devices.remove(deva)
707 devices.remove(devb)
708 return test_pair
709