blob: f69a2216eb639f383828cd24d06a471e5f6a08dd [file] [log] [blame]
Scottc98bfd72016-02-22 16:18:53 -08001# Copyright 2016 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import re
6import logging
7import time
8
9from autotest_lib.client.common_lib import error
10from autotest_lib.server.cros.servo import pd_console
11
12
13class PDDevice(object):
14 """Base clase for all PD devices
15
16 This class provides a set of APIs for expected Type C PD required actions
17 in TypeC FAFT tests. The base class is specific for Type C devices that
18 do not have any console access.
19
20 """
21
22 def is_src(self):
23 """Checks if the port is connected as a source
24
25 """
26 raise NotImplementedError(
27 'is_src should be implemented in derived class')
28
29 def is_snk(self):
30 """Checks if the port is connected as a sink
31
32 @returns None
33 """
34 raise NotImplementedError(
35 'is_snk should be implemented in derived class')
36
37 def is_connected(self):
38 """Checks if the port is connected
39
40 @returns True if in a connected state, False otherwise
41 """
42 return self.is_src() or self.is_snk()
43
44 def is_disconnected(self):
45 """Checks if the port is disconnected
46
47 """
48 raise NotImplementedError(
49 'is_disconnected should be implemented in derived class')
50
51 def is_ufp(self):
52 """Checks if data role is UFP
53
54 """
55 raise NotImplementedError(
56 'is_ufp should be implemented in derived class')
57
58 def is_dfp(self):
59 """Checks if data role is DFP
60
61 """
62 raise NotImplementedError(
63 'is_dfp should be implemented in derived class')
64
65 def is_drp(self):
66 """Checks if dual role mode is supported
67
68 """
69 raise NotImplementedError(
70 'is_drp should be implemented in derived class')
71
72 def dr_swap(self):
73 """Attempts a data role swap
74
75 """
76 raise NotImplementedError(
77 'dr_swap should be implemented in derived class')
78
79 def pr_swap(self):
80 """Attempts a power role swap
81
82 """
83 raise NotImplementedError(
84 'pr_swap should be implemented in derived class')
85
86 def vbus_request(self, voltage):
87 """Requests a specific VBUS voltage from SRC
88
89 @param voltage: requested voltage level (5, 12, 20) in volts
90 """
91 raise NotImplementedError(
92 'vbus_request should be implemented in derived class')
93
Scottf58d0b12016-04-22 10:23:26 -070094 def soft_reset(self):
Scottc98bfd72016-02-22 16:18:53 -080095 """Initates a PD soft reset sequence
96
97 """
98 raise NotImplementedError(
Scott4e194872016-02-08 16:26:17 -080099 'soft_reset should be implemented in derived class')
Scottc98bfd72016-02-22 16:18:53 -0800100
101 def hard_reset(self):
102 """Initates a PD hard reset sequence
103
104 """
105 raise NotImplementedError(
Scott4e194872016-02-08 16:26:17 -0800106 'hard_reset should be implemented in derived class')
Scottc98bfd72016-02-22 16:18:53 -0800107
108 def drp_set(self, mode):
109 """Sets dualrole mode
110
111 @param mode: desired dual role setting (on, off, snk, src)
112 """
113 raise NotImplementedError(
114 'drp_set should be implemented in derived class')
115
116 def drp_disconnect_connect(self, disc_time_sec):
117 """Force PD disconnect/connect via drp settings
118
119 @param disc_time_sec: Time in seconds between disconnect and reconnect
120 """
121 raise NotImplementedError(
122 'drp_disconnect_reconnect should be implemented in \
123 derived class')
124
125 def cc_disconnect_connect(self, disc_time_sec):
126 """Force PD disconnect/connect using Plankton fw command
127
128 @param disc_time_sec: Time in seconds between disconnect and reconnect
129 """
130 raise NotImplementedError(
131 'cc_disconnect_reconnect should be implemented in \
132 derived class')
133
134
135class PDConsoleDevice(PDDevice):
136 """Class for PD devices that have console access
137
138 This class contains methods for common PD actions for any PD device which
139 has UART console access. It inherits the PD device base class. In addition,
140 it stores both the UART console and port for the PD device.
141 """
142
143 def __init__(self, console, port):
144 """Initialization method
145
146 @param console: UART console object
147 @param port: USB PD port number
148 """
149 # Save UART console
150 self.console = console
151 # Instantiate PD utilities used by methods in this class
152 self.utils = pd_console.PDConsoleUtils(console)
153 # Save the PD port number for this device
154 self.port = port
155 # Not a Plankton device
156 self.is_plankton = False
157
158 def is_src(self):
159 """Checks if the port is connected as a source
160
161 @returns True if connected as SRC, False otherwise
162 """
163 state = self.utils.get_pd_state(self.port)
164 return bool(state == self.utils.SRC_CONNECT)
165
166 def is_snk(self):
167 """Checks if the port is connected as a sink
168
169 @returns True if connected as SNK, False otherwise
170 """
171 state = self.utils.get_pd_state(self.port)
172 return bool(state == self.utils.SNK_CONNECT)
173
174 def is_connected(self):
175 """Checks if the port is connected
176
177 @returns True if in a connected state, False otherwise
178 """
179 state = self.utils.get_pd_state(self.port)
180 return bool(state == self.utils.SNK_CONNECT or
181 state == self.utils.SRC_CONNECT)
182
183 def is_disconnected(self):
184 """Checks if the port is disconnected
185
186 @returns True if in a disconnected state, False otherwise
187 """
188 state = self.utils.get_pd_state(self.port)
189 return bool(state == self.utils.SRC_DISC or
190 state == self.utils.SNK_DISC)
191
192 def is_drp(self):
193 """Checks if dual role mode is supported
194
195 @returns True if dual role mode is 'on', False otherwise
196 """
197 return self.utils.is_pd_dual_role_enabled()
198
199 def drp_disconnect_connect(self, disc_time_sec):
200 """Disconnect/reconnect using drp mode settings
201
202 A PD console device doesn't have an explicit connect/disconnect
203 command. Instead, the dualrole mode setting is used to force
204 disconnects in devices which support this feature. To disconnect,
205 force the dualrole mode to be the opposite role of the current
206 connected state.
207
208 @param disc_time_sec: time in seconds to wait to reconnect
209
210 @returns True if device disconnects, then returns to a connected
211 state. False if either step fails.
212 """
213 # Dualrole mode must be supported
214 if self.is_drp() is False:
215 logging.warn('Device not DRP capable, unabled to force disconnect')
216 return False
217 # Force state will be the opposite of current connect state
218 if self.is_src():
219 drp_mode = 'snk'
Scottee2f4822016-05-25 16:34:07 -0700220 swap_state = self.utils.SNK_CONNECT
Scottc98bfd72016-02-22 16:18:53 -0800221 else:
222 drp_mode = 'src'
Scottee2f4822016-05-25 16:34:07 -0700223 swap_state = self.utils.SRC_CONNECT
Scottc98bfd72016-02-22 16:18:53 -0800224 # Force disconnect
225 self.drp_set(drp_mode)
226 # Wait for disconnect time
227 time.sleep(disc_time_sec)
228 # Verify that the device is disconnected
229 disconnect = self.is_disconnected()
Scottee2f4822016-05-25 16:34:07 -0700230
231 # If the other device is dualrole, then forcing dualrole mode will
232 # only cause the disconnect to appear momentarily and reconnect
233 # in the power role forced by the drp_set() call. For this case,
234 # the role swap verifies that a disconnect/connect sequence occurred.
235 if disconnect == False:
236 time.sleep(self.utils.CONNECT_TIME)
237 # Connected, verify if power role swap has ocurred
238 if swap_state == self.utils.get_pd_state(self.port):
239 # Restore default dualrole mode
240 self.drp_set('on')
241 # Restore orignal power role
242 connect = self.pr_swap()
243 if connect == False:
244 logging.warn('DRP on both devices, 2nd power swap failed')
245 return connect
246
Scottc98bfd72016-02-22 16:18:53 -0800247 # Restore default dualrole mode
248 self.drp_set('on')
249 # Allow enough time for protocol state machine
250 time.sleep(self.utils.CONNECT_TIME)
251 # Check if connected
252 connect = self.is_connected()
253 logging.info('Disconnect = %r, Connect = %r', disconnect, connect)
254 return bool(disconnect and connect)
255
256 def drp_set(self, mode):
257 """Sets dualrole mode
258
259 @param mode: desired dual role setting (on, off, snk, src)
260
261 @returns True is set was successful, False otherwise
262 """
263 # Set desired dualrole mode
264 self.utils.set_pd_dualrole(mode)
265 # Get the expected output
266 resp = self.utils.dualrole_resp[self.utils.dual_index[mode]]
267 # Get current setting
268 current = self.utils.get_pd_dualrole()
269 # Verify that setting is correct
270 return bool(resp == current)
271
272 def try_src(self, enable):
273 """Enables/Disables Try.SRC PD protocol setting
274
275 @param enable: True to enable, False to disable
276
277 @returns True is setting was successful, False if feature not
278 supported by the device, or not set as desired.
279 """
280 # Create Try.SRC pd command
281 cmd = 'pd trysrc %d' % int(enable)
282 # Try.SRC on/off is output, if supported feature
283 regex = ['Try\.SRC\s([\w]+)|(Parameter)']
284 m = self.utils.send_pd_command_get_output(cmd, regex)
285 # Determine if Try.SRC feature is supported
286 trysrc = re.search('Try\.SRC\s([\w]+)', m[0][0])
287 if not trysrc:
288 logging.warn('Try.SRC not supported on this PD device')
289 return False
290 # TrySRC is supported on this PD device, verify setting.
291 logging.info('Try.SRC mode = %s', trysrc.group(1))
292 if enable:
293 val = 'on'
294 else:
295 val = 'off'
296 return bool(val == m[0][1])
297
Scottf58d0b12016-04-22 10:23:26 -0700298 def soft_reset(self):
Scottcae6fe42016-02-08 16:26:17 -0800299 """Initates a PD soft reset sequence
300
301 To verify that a soft reset sequence was initiated, the
302 reply message is checked to verify that the reset command
303 was acknowledged by its port pair. The connect state should
304 be same as it was prior to issuing the reset command.
305
Scottcae6fe42016-02-08 16:26:17 -0800306 @returns True if the port pair acknowledges the the reset message
307 and if following the command, the device returns to the same
308 connected state. False otherwise.
309 """
310 RESET_DELAY = 0.5
311 cmd = 'pd %d soft' % self.port
312 state_before = self.utils.get_pd_state(self.port)
313 reply = self.utils.send_pd_command_get_reply_msg(cmd)
314 if reply != self.utils.PD_CONTROL_MSG_DICT['Accept']:
315 return False
316 time.sleep(RESET_DELAY)
317 state_after = self.utils.get_pd_state(self.port)
318 return state_before == state_after
319
Scott4e194872016-02-08 16:26:17 -0800320 def hard_reset(self):
321 """Initates a PD hard reset sequence
322
323 To verify that a hard reset sequence was initiated, the
324 console ouput is scanned for HARD RST TX. In addition, the connect
325 state should be same as it was prior to issuing the reset command.
326
327 @returns True if the port pair acknowledges that hard reset was
328 initiated and if following the command, the device returns to the same
329 connected state. False otherwise.
330 """
331 RESET_DELAY = 1.0
332 cmd = 'pd %d hard' % self.port
333 state_before = self.utils.get_pd_state(self.port)
334 self.utils.enable_pd_console_debug()
335 try:
336 self.utils.send_pd_command_get_output(cmd, ['.*(HARD\sRST\sTX)'])
337 except error.TestFail:
338 logging.warn('HARD RST TX not found')
339 return False
340 finally:
341 self.utils.disable_pd_console_debug()
342
343 time.sleep(RESET_DELAY)
344 state_after = self.utils.get_pd_state(self.port)
345 return state_before == state_after
346
Scottcae6fe42016-02-08 16:26:17 -0800347 def pr_swap(self):
348 """Attempts a power role swap
349
350 In order to attempt a power role swap the device must be
351 connected and support dualrole mode. Once these two criteria
352 are checked a power role command is issued. Following a delay
353 to allow for a reconnection the new power role is checked
354 against the power role prior to issuing the command.
355
356 @returns True if the device has swapped power roles, False otherwise.
357 """
358 # Get starting state
359 if not self.is_drp() and not self.drp_set('on'):
360 logging.warn('Dualrole Mode not enabled!')
361 return False
362 if self.is_connected() == False:
363 logging.warn('PD contract not established!')
364 return False
365 current_pr = self.utils.get_pd_state(self.port)
366 swap_cmd = 'pd %d swap power' % self.port
367 self.utils.send_pd_command(swap_cmd)
368 time.sleep(self.utils.CONNECT_TIME)
369 new_pr = self.utils.get_pd_state(self.port)
370 logging.info('Power swap: %s -> %s', current_pr, new_pr)
371 if self.is_connected() == False:
372 logging.warn('Device not connected following PR swap attempt.')
373 return False
374 return current_pr != new_pr
375
Scottc98bfd72016-02-22 16:18:53 -0800376
377class PDPlanktonDevice(PDConsoleDevice):
378 """Class for PD Plankton devices
379
380 This class contains methods for PD funtions which are unique to the
381 Plankton Type C factory testing board. It inherits all the methods
382 for PD console devices.
383 """
384
385 def __init__(self, console, port):
386 """Initialization method
387
388 @param console: UART console for this device
389 @param port: USB PD port number
390 """
391 # Instantiate the PD console object
392 super(PDPlanktonDevice, self).__init__(console, 0)
393 # Indicate this is Plankton device
394 self.is_plankton = True
395
396 def _toggle_plankton_drp(self):
397 """Issue 'usbc_action drp' Plankton command
398
399 @returns value of drp_enable in Plankton FW
400 """
401 drp_cmd = 'usbc_action drp'
402 drp_re = ['DRP\s=\s(\d)']
403 # Send DRP toggle command to Plankton and get value of 'drp_enable'
404 m = self.utils.send_pd_command_get_output(drp_cmd, drp_re)
405 return int(m[0][1])
406
407 def _enable_plankton_drp(self):
408 """Enable DRP mode on Plankton
409
410 DRP mode can only be toggled and is not able to be explicitly
411 enabled/disabled via the console. Therefore, this method will
412 toggle DRP mode until the console reply indicates that this
413 mode is enabled. The toggle happens a maximum of two times
414 in case this is called when it's already enabled.
415
416 @returns True when DRP mode is enabled, False if not successful
417 """
418 for attempt in xrange(2):
419 if self._toggle_plankton_drp() == True:
420 logging.info('Plankton DRP mode enabled')
421 return True
422 logging.error('Plankton DRP mode set failure')
423 return False
424
Scottcae6fe42016-02-08 16:26:17 -0800425 def _verify_state_sequence(self, states_list, console_log):
426 """Compare PD state transitions to expected values
427
428 @param states_list: list of expected PD state transitions
429 @param console_log: console output which contains state names
Scott4e194872016-02-08 16:26:17 -0800430
Scottcae6fe42016-02-08 16:26:17 -0800431 @returns True if the sequence matches, False otherwise
432 """
433 # For each state in the expected state transiton table, build
434 # the regexp and search for it in the state transition log.
435 for state in states_list:
436 state_regx = r'C{0}\s+[\w]+:\s({1})'.format(self.port,
437 state)
438 if re.search(state_regx, console_log) is None:
439 return False
440 return True
441
Scottc98bfd72016-02-22 16:18:53 -0800442 def cc_disconnect_connect(self, disc_time_sec):
443 """Disconnect/reconnect using Plankton
444
445 Plankton supports a feature which simulates a USB Type C disconnect
446 and reconnect.
447
448 @param disc_time_sec: Time in seconds for disconnect period.
449 """
450 DISC_DELAY = 100
451 disc_cmd = 'fake_disconnect %d %d' % (DISC_DELAY,
452 disc_time_sec * 1000)
453 self.utils.send_pd_command(disc_cmd)
454
Scottee2f4822016-05-25 16:34:07 -0700455 def drp_disconnect_connect(self, disc_time_sec):
456 """Disconnect/reconnect using Plankton
457
458 Utilize Plankton disconnect/connect utility and verify
459 that both disconnect and reconnect actions were successful.
460
461 @param disc_time_sec: Time in seconds for disconnect period.
462
463 @returns True if device disconnects, then returns to a connected
464 state. False if either step fails.
465 """
466 self.cc_disconnect_connect(disc_time_sec)
467 time.sleep(disc_time_sec / 2)
468 disconnect = self.is_disconnected()
469 time.sleep(disc_time_sec / 2 + self.utils.CONNECT_TIME)
470 connect = self.is_connected()
471 return disconnect and connect
472
Scottc98bfd72016-02-22 16:18:53 -0800473 def drp_set(self, mode):
474 """Sets dualrole mode
475
476 @param mode: desired dual role setting (on, off, snk, src)
477
478 @returns True if dualrole mode matches the requested value or
479 is successfully set to that value. False, otherwise.
480 """
481 # Get correct dualrole console response
482 resp = self.utils.dualrole_resp[self.utils.dual_index[mode]]
483 # Get current value of dualrole
484 drp = self.utils.get_pd_dualrole()
485 if drp == resp:
486 return True
487
488 if mode == 'on':
489 # Setting dpr_enable on Plankton will set dualrole mode to on
490 return self._enable_plankton_drp()
491 else:
492 # If desired setting is other than 'on', need to ensure that
493 # drp mode on Plankton is disabled.
494 if resp == 'on':
495 # This will turn off drp_enable flag and set dualmode to 'off'
496 return self._toggle_plankton_drp()
497 # With drp_enable flag off, can set to desired setting
498 return self.utils.set_pd_dualrole(mode)
499
Scott4e194872016-02-08 16:26:17 -0800500 def _reset(self, cmd, states_list):
501 """Initates a PD reset sequence
Scottcae6fe42016-02-08 16:26:17 -0800502
503 Plankton device has state names available on the console. When
504 a soft reset is issued the console log is extracted and then
505 compared against the expected state transisitons.
506
Scott4e194872016-02-08 16:26:17 -0800507 @param cmd: reset type (soft or hard)
508 @param states_list: list of expected PD state transitions
509
510 @returns True if state transitions match, False otherwise
511 """
512 # Want to grab all output until either SRC_READY or SNK_READY
513 reply_exp = ['(.*)(C\d)\s+[\w]+:\s([\w]+_READY)']
514 m = self.utils.send_pd_command_get_output(cmd, reply_exp)
515 return self._verify_state_sequence(states_list, m[0][0])
516
Scottf58d0b12016-04-22 10:23:26 -0700517 def soft_reset(self):
Scott4e194872016-02-08 16:26:17 -0800518 """Initates a PD soft reset sequence
519
Scottcae6fe42016-02-08 16:26:17 -0800520 @returns True if state transitions match, False otherwise
521 """
Scottf58d0b12016-04-22 10:23:26 -0700522 snk_reset_states = [
523 'SOFT_RESET',
524 'SNK_DISCOVERY',
525 'SNK_REQUESTED',
526 'SNK_TRANSITION',
527 'SNK_READY'
528 ]
529
530 src_reset_states = [
531 'SOFT_RESET',
532 'SRC_DISCOVERY',
533 'SRC_NEGOCIATE',
534 'SRC_ACCEPTED',
535 'SRC_POWERED',
536 'SRC_TRANSITION',
537 'SRC_READY'
538 ]
539
540 if self.is_src():
541 states_list = src_reset_states
542 elif self.is_snk():
543 states_list = snk_reset_states
544 else:
545 raise error.TestFail('Port Pair not in a connected state')
546
Scottcae6fe42016-02-08 16:26:17 -0800547 cmd = 'pd %d soft' % self.port
Scott4e194872016-02-08 16:26:17 -0800548 return self._reset(cmd, states_list)
549
550 def hard_reset(self):
551 """Initates a PD hard reset sequence
552
553 @returns True if state transitions match, False otherwise
554 """
555 snk_reset_states = [
556 'HARD_RESET_SEND',
557 'HARD_RESET_EXECUTE',
558 'SNK_HARD_RESET_RECOVER',
559 'SNK_DISCOVERY',
560 'SNK_REQUESTED',
561 'SNK_TRANSITION',
562 'SNK_READY'
563 ]
564
565 src_reset_states = [
566 'HARD_RESET_SEND',
567 'HARD_RESET_EXECUTE',
568 'SRC_HARD_RESET_RECOVER',
569 'SRC_DISCOVERY',
570 'SRC_NEGOCIATE',
571 'SRC_ACCEPTED',
572 'SRC_POWERED',
573 'SRC_TRANSITION',
574 'SRC_READY'
575 ]
576
577 if self.is_src():
578 states_list = src_reset_states
579 elif self.is_snk():
580 states_list = snk_reset_states
581 else:
582 raise error.TestFail('Port Pair not in a connected state')
583
584 cmd = 'pd %d hard' % self.port
585 return self._reset(cmd, states_list)
Scottcae6fe42016-02-08 16:26:17 -0800586
Scottc98bfd72016-02-22 16:18:53 -0800587
588class PDPortPartner(object):
589 """Methods used to instantiate PD device objects
590
591 This class is initalized with a list of servo consoles. It
592 contains methods to determine if USB PD devices are accessible
593 via the consoles and attempts to determine USB PD port partners.
594 A PD device is USB PD port specific, a single console may access
595 multiple PD devices.
596
597 """
598
599 def __init__(self, consoles):
600 """Initialization method
601
602 @param consoles: list of servo consoles
603 """
604 self.consoles = consoles
605
606 def _send_pd_state(self, port, console):
607 """Tests if PD device exists on a given port number
608
609 @param port: USB PD port number to try
610 @param console: servo UART console
611
612 @returns True if 'pd <port> state' command gives a valid
613 response, False otherwise
614 """
615 cmd = 'pd %d state' % port
616 regex = r'(Port C\d)|(Parameter)'
617 m = console.send_command_get_output(cmd, [regex])
618 # If PD port exists, then output will be Port C0 or C1
619 regex = r'Port C{0}'.format(port)
620 if re.search(regex, m[0][0]):
621 return True
622 return False
623
624 def _find_num_pd_ports(self, console):
625 """Determine number of PD ports for a given console
626
627 @param console: uart console accssed via servo
628
629 @returns: number of PD ports accessible via console
630 """
631 MAX_PORTS = 2
632 num_ports = 0
633 for port in xrange(MAX_PORTS):
634 if self._send_pd_state(port, console):
635 num_ports += 1
636 return num_ports
637
638 def _is_pd_console(self, console):
639 """Check if pd option exists in console
640
641 @param console: uart console accssed via servo
642
643 @returns: True if 'pd' is found, False otherwise
644 """
645 try:
646 m = console.send_command_get_output('help', [r'(pd)\s+'])
647 return True
648 except error.TestFail:
649 return False
650
651 def _is_plankton_console(self, console):
652 """Check for Plankton console
653
654 This method looks for a console command option 'usbc_action' which
655 is unique to Plankton PD devices.
656
657 @param console: uart console accssed via servo
658
659 @returns True if usbc_action command is present, False otherwise
660 """
661 try:
662 m = console.send_command_get_output('help', [r'(usbc_action)'])
663 return True
664 except error.TestFail:
665 return False
666
667 def _check_port_pair(self, dev_pair):
668 """Check if two PD devices could be connected
669
670 If two USB PD devices are connected, then they should be in
671 either the SRC_READY or SNK_READY states and have opposite
672 power roles. In addition, they must be on different servo
673 consoles.
674
675 @param: list of two possible PD port parters
676
677 @returns True if not the same console and both PD devices
678 are a plausible pair based only on their PD states.
679 """
680 # Don't test if on the same servo console
681 if dev_pair[0].console == dev_pair[1].console:
682 logging.info('PD Devices are on same platform -> cant be a pair')
683 return False
684 # Must be SRC <--> SNK or SNK <--> SRC
685 return bool((dev_pair[0].is_src() and dev_pair[1].is_snk()) or
686 (dev_pair[0].is_snk() and dev_pair[1].is_src()))
687
688 def _verify_plankton_connection(self, dev_pair):
689 """Verify DUT to Plankton PD connection
690
691 This method checks for a Plankton PD connection for the
692 given port by first verifying if a PD connection is present.
693 If found, then it uses a Plankton feature to force a PD disconnect.
694 If the port is no longer in the connected state, and following
695 a delay, is found to be back in the connected state, then
696 a DUT pd to Plankton connection is verified.
697
698 @param dev_pair: list of two PD devices
699
700 @returns True if DUT to Plankton pd connection is verified
701 """
702 DISC_CHECK_TIME = .5
703 DISC_WAIT_TIME = 2
704 CONNECT_TIME = 4
705
706 if not self._check_port_pair(dev_pair):
707 return False
708
709 for index in xrange(len(dev_pair)):
710 try:
711 # Force PD disconnect
712 dev_pair[index].cc_disconnect_connect(DISC_WAIT_TIME)
713 time.sleep(DISC_CHECK_TIME)
714 # Verify that both devices are now disconnected
715 if (dev_pair[0].is_disconnected() and
716 dev_pair[1].is_disconnected()):
717 # Allow enough time for reconnection
718 time.sleep(DISC_WAIT_TIME + CONNECT_TIME)
719 if self._check_port_pair(dev_pair):
720 # Have verifed a pd disconnect/reconnect sequence
721 logging.info('Plankton <-> DUT pair found')
722 return True
723 else:
724 # Delay to allow
725 time.sleep(DISC_WAIT_TIME + CONNECT_TIME)
726 except NotImplementedError:
727 logging.info('dev %d is not Plankton', index)
728 return False
729
730 def identify_pd_devices(self):
731 """Instantiate PD devices present in test setup
732
733 @returns list of 2 PD devices if a DUT <-> Plankton found. If
734 not found, then returns an empty list.
735 """
736 devices = []
737 # For each possible uart console, check to see if a PD console
738 # is present and determine the number of PD ports.
739 for console in self.consoles:
740 if self._is_pd_console(console):
741 is_plank = self._is_plankton_console(console)
742 num_ports = self._find_num_pd_ports(console)
743 # For each PD port that can be accessed via the console,
744 # instantiate either PDConsole or PDPlankton device.
745 for port in xrange(num_ports):
746 if is_plank:
747 logging.info('Plankton PD Device on port %d', port)
748 devices.append(PDPlanktonDevice(console, port))
749 else:
750 devices.append(PDConsoleDevice(console, port))
751 logging.info('Console PD Device on port %d', port)
752
753 # Determine PD port partners in the list of PD devices. Note, that
754 # there can be PD devices which are not accessible via a uart console,
755 # but are connected to a PD port which is accessible.
756 test_pair = []
757 for deva in devices:
758 for dev_idx in range(devices.index(deva) + 1, len(devices)):
759 devb = devices[dev_idx]
760 pair = [deva, devb]
761 if self._verify_plankton_connection(pair):
762 test_pair = pair
763 devices.remove(deva)
764 devices.remove(devb)
765 return test_pair
766