faft: Refactored PDConnect test to use pd_device objects

The existing version of this test was written prior to
developing the pd_device objects. Refactored the test to
take advantage of this encapsulation. The overall test is
now much simpler to both implement and maintain.

BRANCH=none
BUG=chrome-os-partner:53742
TEST=Manual
Tested using Samus and Plankton with workstation.

Change-Id: Id2d93fae7709d799e57b1837b5dff6a3e8148319
Signed-off-by: Scott <scollyer@chromium.org>
Reviewed-on: https://chromium-review.googlesource.com/347447
Commit-Ready: Scott Collyer <scollyer@chromium.org>
Tested-by: Scott Collyer <scollyer@chromium.org>
Reviewed-by: Todd Broch <tbroch@chromium.org>
diff --git a/server/cros/servo/chrome_ec.py b/server/cros/servo/chrome_ec.py
index 0d3e4e1..690716c 100644
--- a/server/cros/servo/chrome_ec.py
+++ b/server/cros/servo/chrome_ec.py
@@ -220,3 +220,17 @@
         # See chromium:371631 for details.
         # FIXME: Stop importing time module if this hack becomes obsolete.
         time.sleep(1)
+
+    def enable_console_channel(self, channel):
+        """Find console channel mask and enable that channel only
+
+        @param channel: console channel name
+        """
+        # The 'chan' command returns a list of console channels,
+        # their channel masks and channel numbers
+        regexp = r'(\d+)\s+([\w]+)\s+\*?\s+{0}'.format(channel)
+        l = self.send_command_get_output('chan', [regexp])
+        # Use channel mask and append the 0x for proper hex input value
+        cmd = 'chan 0x' + l[0][2]
+        # Set console to only output the desired channel
+        self.send_command(cmd)
diff --git a/server/site_tests/firmware_PDConnect/control b/server/site_tests/firmware_PDConnect/control
index 482cce4..6d08f46 100644
--- a/server/site_tests/firmware_PDConnect/control
+++ b/server/site_tests/firmware_PDConnect/control
@@ -21,9 +21,11 @@
 
 args_dict = utils.args_to_dict(args)
 servo_args = hosts.CrosHost.get_servo_arguments(args_dict)
+plankton_args = hosts.CrosHost.get_plankton_arguments(args_dict)
 
 def run(machine):
-    host = hosts.create_host(machine, servo_args=servo_args)
+    host = hosts.create_host(machine, servo_args=servo_args,
+                             plankton_args=plankton_args)
     job.run_test("firmware_PDConnect", host=host, cmdline_args=args,
                  disable_sysinfo=True)
 
diff --git a/server/site_tests/firmware_PDConnect/firmware_PDConnect.py b/server/site_tests/firmware_PDConnect/firmware_PDConnect.py
index b82cc0c..c9b9cde 100644
--- a/server/site_tests/firmware_PDConnect/firmware_PDConnect.py
+++ b/server/site_tests/firmware_PDConnect/firmware_PDConnect.py
@@ -3,121 +3,82 @@
 # found in the LICENSE file.
 
 import logging
-import time
 
 from autotest_lib.client.common_lib import error
 from autotest_lib.server.cros.faft.firmware_test import FirmwareTest
-from autotest_lib.server.cros.servo import pd_console
+from autotest_lib.server.cros.servo import pd_device
 
 
 class firmware_PDConnect(FirmwareTest):
     """
-    Servo based USB PD connect/disconnect test. This test is written
-    for the DUT and requires that the DUT support dualrole (SRC or SNK)
-    operation in order to force a disconnect and connect event. The test
-    does not depend on the DUT acting as source or sink, either mode
-    should pass.
+    Servo based USB PD connect/disconnect test. If Plankton is not one
+    of the device pair elements, then this test requires that at least
+    one of the devices support dual role mode in order to force a disconnect
+    to connect sequence. The test does not depend on the DUT acting as source
+    or sink, either mode should pass.
 
     Pass critera is 100%  of connections resulting in successful connections
-
     """
     version = 1
+    CONNECT_ITERATIONS = 10
+    def _test_connect(self, port_pair):
+        """Tests disconnect/connect sequence
 
+        @param port_pair: list of 2 connected PD devices
+        """
+        # Delay in seconds between disconnect and connect commands
+        RECONNECT_DELAY = 2
+        for dev in port_pair:
+            for attempt in xrange(self.CONNECT_ITERATIONS):
+                logging.info('Disconnect/Connect iteration %d', attempt)
+                try:
+                    if dev.drp_disconnect_connect(RECONNECT_DELAY) == False:
+                        raise error.TestFail('Disconnect/Connect Failed')
+                except NotImplementedError:
+                    logging.warn('Device does not support disconnect/connect')
+                    break
 
     def initialize(self, host, cmdline_args):
         super(firmware_PDConnect, self).initialize(host, cmdline_args)
         # Only run in normal mode
         self.switcher.setup_mode('normal')
-        self.usbpd.send_command("chan 0")
+        self.usbpd.enable_console_channel('usbpd')
 
 
     def cleanup(self):
-        self.usbpd.send_command("chan 0xffffffff")
+        self.usbpd.send_command('chan 0xffffffff')
         super(firmware_PDConnect, self).cleanup()
 
 
-    def _test_pd_connection(self, connect_state, port):
-        """Verify current pd state matches the expected value.
-
-        The current state will be read up to 2 times. This
-        may not be required, but during development testing
-        instances were observed where state reads on glados
-        did not give the full state string which would then
-        applear to be a failure even though the type C connection
-        had been made.
-
-        @params connect_state: Expected state string
-        @params port: port number <0/1> to query
-        @returns: True if state matches, false otherwise
-        """
-        for attempts in range(1,3):
-            pd_state = self.pd.get_pd_state(self.port)
-            if pd_state == connect_state:
-                return True
-        return False
-
-
     def run_once(self):
         """Exectue disconnect/connect sequence test
 
         """
-        # delay between test iterations
-        DUALROLE_SET_DELAY = 2
 
-        # create objects for pd utilities
-        self.pd = pd_console.PDConsoleUtils(self.usbpd)
+        # Create list of available UART consoles
+        consoles = [self.usbpd, self.plankton]
+        port_partner = pd_device.PDPortPartner(consoles)
+        # Identify a valid test port pair
+        port_pair = port_partner.identify_pd_devices()
+        if not port_pair:
+            raise error.TestFail('No PD connection found!')
 
-        # Make sure PD support exists in the UART console
-        if self.pd.verify_pd_console() == False:
-            raise error.TestFail("pd command not present on console!")
-
-        # Enable dualrole mode
-        self.pd.set_pd_dualrole('on')
-        time.sleep(DUALROLE_SET_DELAY)
-
-        # Type C connection (PD contract) should exist at this point
-        connect_status = self.pd.query_pd_connection()
-        if connect_status['connect'] == False:
-            raise error.TestFail("pd connection not found")
-        # Record port where type C connection was detected
-        self.port = connect_status['port']
-        # Save the SRC vs SNK state
-        connect_state = connect_status['role']
-
-        logging.info('Type C connection detected on Port %d: %r',
-                     self.port, connect_state)
-
-        # determine the dualrole command to connect/disconnect
-        if  connect_state == 'SRC_READY':
-            disc_cmd = 'snk'
-            connect_cmd = 'src'
+        # Test disconnect/connect sequences
+        self._test_connect(port_pair)
+        # Swap power roles (if possible). Note the pr swap is attempted
+        # for both devices in the connection. This ensures that a device
+        # such as Plankton, which is dualrole capable, but has this mode
+        # disabled by default, won't prevent the device pair from role swapping.
+        swap = False;
+        for dev in port_pair:
+            try:
+                if dev.pr_swap():
+                    swap = True
+                    break
+            except NotImplementedError:
+                logging.warn('device cant send power role swap command')
+        if swap == True:
+            # Power role has been swapped, retest.
+            self._test_connect(port_pair)
         else:
-            disc_cmd = 'src'
-            connect_cmd = 'snk'
-
-        # counter used for successful iterations
-        success = 0
-        total_attempts = 100
-
-        # Attempt connect/disconnect iterations
-        for test_count in range(1, total_attempts + 1):
-            logging.info ('\n************ Iteration %r ***************',
-                          test_count)
-            # Force Type C disconnect
-            self.pd.set_pd_dualrole(disc_cmd)
-            time.sleep(DUALROLE_SET_DELAY)
-            # Attempt to reconnect
-            self.pd.set_pd_dualrole(connect_cmd)
-            time.sleep(DUALROLE_SET_DELAY)
-            # Verify connection was successful
-            if self._test_pd_connection(connect_state, self.port) == True:
-                success += 1
-
-        self.pd.set_pd_dualrole('on')
-        logging.info ('************ Connection Stats ***************')
-        logging.info ('Attempts = %d: Connections = %d', test_count, success)
-        logging.info ('*********************************************')
-        if success != total_attempts:
-            raise error.TestFail("Attempts = " + str(total_attempts) +
-                                 ': Success = ' + str(success))
-
+            logging.warn('Device pair could not role swap, ending test')