servo: abstract dts mode logic

This change adds a few helpers to handle dts mode

- dts_mode_is_valid: whether dts mode can be supported.
- dts_mode_is_safe: whether controlling dts mode will remove the
main device's console access

- get_dts_mode: helper to get the dts mode

BUG=chromium:1009616

TEST=test_that --autotest_dir . $DIP firmware_Cr50CCDServoCap

/tmp/test_that_results_b2EdTg/results-1-firmware_Cr50CCDServoCap
[  PASSED  ]

Note: all tests below are done by adding two logging statements into
initialize_dut() and subsequently asseting False to speed debug.

TEST=sudo servod -b atlas // v4 (type-c) + micro
     test_that --autotest_dir . $AIP power_Monitoring -b atlas

12:03:08 INFO | autoserv| TYPE: servo_v4_with_servo_micro
12:03:08 INFO | autoserv| DTS MODE: True

TEST=sudo servod -b atlas // v4 (type-c) + ccd
     test_that --autotest_dir . $AIP power_Monitoring -b atlas

12:05:20 INFO | autoserv| TYPE: servo_v4_with_ccd_cr50
12:05:20 INFO | autoserv| DTS MODE: True

TEST=sudo servod -b atlas // v4 (type-A) + micro
     test_that --autotest_dir . $AIP power_Monitoring -b atlas

12:06:49 INFO | autoserv| TYPE: servo_v4_with_servo_micro
12:06:50 INFO | autoserv| DTS controls require a type-c servo v4.
12:06:50 INFO | autoserv| DTS MODE: False

TEST=sudo servod -b atlas // v2
     test_that --autotest_dir . $AIP power_Monitoring -b atlas

12:08:19 INFO | autoserv| TYPE: servo_v2
12:08:19 INFO | autoserv| DTS MODE: False

Change-Id: I7f342691bf84896df5a779555bf33334ff1c17b7
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/third_party/autotest/+/1835913
Tested-by: Ruben Rodriguez Buchillon <coconutruben@chromium.org>
Commit-Queue: Ruben Rodriguez Buchillon <coconutruben@chromium.org>
Reviewed-by: Mary Ruthven <mruthven@chromium.org>
diff --git a/server/cros/servo/chrome_cr50.py b/server/cros/servo/chrome_cr50.py
index 85ab1e0..a7a2ffc 100644
--- a/server/cros/servo/chrome_cr50.py
+++ b/server/cros/servo/chrome_cr50.py
@@ -14,14 +14,15 @@
 from autotest_lib.server.cros.servo import chrome_ec
 
 
-def servo_v4_command(func):
-    """Decorator for methods only relevant to tests running with servo v4."""
+def dts_control_command(func):
+    """For methods that should only run when dts mode control is supported."""
     @functools.wraps(func)
     def wrapper(instance, *args, **kwargs):
-        """Ignore servo v4 functions it's not being used."""
-        if instance.using_servo_v4():
+        """Ignore those functions if dts mode control is not supported."""
+        if instance._servo.dts_mode_is_valid():
             return func(instance, *args, **kwargs)
-        logging.info("not using servo v4. ignoring %s", func.func_name)
+        logging.info('Servo setup does not support DTS mode. ignoring %s',
+                     func.func_name)
     return wrapper
 
 
@@ -618,11 +619,6 @@
         return self.get_active_version_info()[1].strip()
 
 
-    def using_servo_v4(self):
-        """Returns true if the console is being served using servo v4"""
-        return 'servo_v4' in self._servo.get_servo_version()
-
-
     def ccd_is_enabled(self):
         """Return True if ccd is enabled.
 
@@ -639,7 +635,7 @@
             return not bool(self.gpioget('CCD_MODE_L'))
 
 
-    @servo_v4_command
+    @dts_control_command
     def wait_for_stable_ccd_state(self, state, timeout, raise_error):
         """Wait up to timeout seconds for CCD to be 'on' or 'off'
 
@@ -673,31 +669,31 @@
         logging.info("ccd is %r", 'on' if enabled else 'off')
 
 
-    @servo_v4_command
+    @dts_control_command
     def wait_for_ccd_disable(self, timeout=60, raise_error=True):
         """Wait for the cr50 console to stop working"""
         self.wait_for_stable_ccd_state('off', timeout, raise_error)
 
 
-    @servo_v4_command
+    @dts_control_command
     def wait_for_ccd_enable(self, timeout=60, raise_error=False):
         """Wait for the cr50 console to start working"""
         self.wait_for_stable_ccd_state('on', timeout, raise_error)
 
 
-    @servo_v4_command
+    @dts_control_command
     def ccd_disable(self, raise_error=True):
         """Change the values of the CC lines to disable CCD"""
         logging.info("disable ccd")
-        self._servo.set_servo_v4_dts_mode('off')
+        self._servo.set_dts_mode('off')
         self.wait_for_ccd_disable(raise_error=raise_error)
 
 
-    @servo_v4_command
+    @dts_control_command
     def ccd_enable(self, raise_error=False):
         """Reenable CCD and reset servo interfaces"""
         logging.info("reenable ccd")
-        self._servo.set_servo_v4_dts_mode('on')
+        self._servo.set_dts_mode('on')
         # If the test is actually running with ccd, wait for USB communication
         # to come up after reset.
         if self._servo.main_device_is_ccd():
@@ -901,17 +897,16 @@
         return float(result[0][1])
 
 
-    def servo_v4_supports_dts_mode(self):
-        """Returns True if cr50 registers changes in servo v4 dts mode."""
+    def servo_dts_mode_is_valid(self):
+        """Returns True if cr50 registers change in servo dts mode."""
         # This is to test that Cr50 actually recognizes the change in ccd state
         # We cant do that with tests using ccd, because the cr50 communication
         # goes down once ccd is enabled.
-        if (self._servo.get_servo_version(active=True) !=
-            'servo_v4_with_servo_micro'):
+        if not self._servo.dts_mode_is_safe():
             return False
 
         ccd_start = 'on' if self.ccd_is_enabled() else 'off'
-        dts_start = self._servo.get('servo_v4_dts_mode')
+        dts_start = self._servo.get_dts_mode()
         try:
             # Verify both ccd enable and disable
             self.ccd_disable(raise_error=True)
@@ -920,7 +915,7 @@
         except Exception, e:
             logging.info(e)
             rv = False
-        self._servo.set_servo_v4_dts_mode(dts_start)
+        self._servo.set_dts_mode(dts_start)
         self.wait_for_stable_ccd_state(ccd_start, 60, True)
         logging.info('Test setup does%s support servo DTS mode',
                 '' if rv else 'n\'t')
diff --git a/server/cros/servo/servo.py b/server/cros/servo/servo.py
index 2908a78..54b5cba 100644
--- a/server/cros/servo/servo.py
+++ b/server/cros/servo/servo.py
@@ -1078,6 +1078,25 @@
         """Whether the main servo device (no prefixes) is a legacy device."""
         return not self.main_device_is_ccd()
 
+
+    def main_device_is_active(self):
+        """Return whether the main device is the active device.
+
+        This is only relevant for a dual setup with ccd and legacy on the same
+        DUT. The main device is the servo that has no prefix on its controls.
+        This helper answers the question whether that device is also the
+        active device or not.
+        """
+        # TODO(coconutruben): The current implementation of the dual setup only
+        # ever has legacy as the main device. Therefore, it suffices to ask
+        # whether the active device is ccd.
+        if not self.dts_mode_is_valid():
+            # Use dts support as a proxy to whether the servo setup could
+            # support a dual role. Only those setups now support legacy and ccd.
+            return True
+        active_device = self.get('active_v4_device')
+        return 'ccd_cr50' not in active_device
+
     def _initialize_programmer(self, rw_only=False):
         """Initialize the firmware programmer.
 
@@ -1321,8 +1340,40 @@
         logging.info('Charger port voltage: %dmV', chg_port_mv)
         return True
 
-    def set_servo_v4_dts_mode(self, state):
-        """Set servo v4 dts mode to off or on.
+    def dts_mode_is_valid(self):
+        """Return whether servo setup supports dts mode control for cr50."""
+        if 'servo_v4' not in self._servo_type:
+            # Only servo v4 supports this feature.
+            logging.debug('%r type does not support dts mode control.',
+                          self._servo_type)
+            return False
+        # On servo v4, it still needs ot be the type-c version.
+        if not 'type-c' == self.get('servo_v4_type'):
+            logging.info('DTS controls require a type-c servo v4.')
+            return False
+        return True
+
+    def dts_mode_is_safe(self):
+        """Return whether servo setup supports dts mode without losing access.
+
+        DTS mode control exists but the main device might go through ccd.
+        In that case, it's only safe to control dts mode if the main device
+        is legacy as otherwise the connection to the main device cuts out.
+        """
+        return self.dts_mode_is_valid() and self.main_device_is_flex()
+
+    def get_dts_mode(self):
+        """Return servo dts mode.
+
+        @returns: on/off whether dts is on or off
+        """
+        if not self.dts_mode_is_valid():
+            logging.info('Not a valid servo setup. Unable to get dts mode.')
+            return
+        return self.get('servo_v4_dts_mode')
+
+    def set_dts_mode(self, state):
+        """Set servo dts mode to off or on.
 
         It does nothing if not a servo v4. Disable the ccd watchdog if we're
         disabling dts mode. CCD will disconnect. The watchdog only allows CCD
@@ -1331,8 +1382,9 @@
 
         @param state: Set servo v4 dts mode 'off' or 'on'.
         """
-        if not self._servo_type.startswith('servo_v4'):
-            logging.debug('Not a servo v4, unable to set dts mode %s.', state)
+        if not self.dts_mode_is_valid():
+            logging.info('Not a valid servo setup. Unable to set dts mode %s.',
+                         state)
             return
 
         # TODO(mruthven): remove watchdog check once the labstation has been
diff --git a/server/site_tests/firmware_Cr50CCDServoCap/firmware_Cr50CCDServoCap.py b/server/site_tests/firmware_Cr50CCDServoCap/firmware_Cr50CCDServoCap.py
index 2c7c432..05a5aec 100644
--- a/server/site_tests/firmware_Cr50CCDServoCap/firmware_Cr50CCDServoCap.py
+++ b/server/site_tests/firmware_Cr50CCDServoCap/firmware_Cr50CCDServoCap.py
@@ -109,7 +109,7 @@
             'servo_v4_with_servo_micro'):
             raise error.TestNAError('Must use servo v4 with servo micro')
 
-        if not self.cr50.servo_v4_supports_dts_mode():
+        if not self.cr50.servo_dts_mode_is_valid():
             raise error.TestNAError('Need working servo v4 DTS control')
 
         self.check_servo_monitor()
@@ -250,7 +250,7 @@
 
         @param state: string 'attach' or 'detach'
         """
-        self.servo.set_servo_v4_dts_mode('on' if state == 'attach' else 'off')
+        self.servo.set_dts_mode('on' if state == 'attach' else 'off')
         time.sleep(self.SLEEP)
 
 
diff --git a/server/site_tests/firmware_Cr50DeferredECReset/firmware_Cr50DeferredECReset.py b/server/site_tests/firmware_Cr50DeferredECReset/firmware_Cr50DeferredECReset.py
index 0a3935a..60606d8 100644
--- a/server/site_tests/firmware_Cr50DeferredECReset/firmware_Cr50DeferredECReset.py
+++ b/server/site_tests/firmware_Cr50DeferredECReset/firmware_Cr50DeferredECReset.py
@@ -71,9 +71,9 @@
         if not hasattr(self, 'cr50'):
             raise error.TestNAError('Test can only be run on devices with '
                                     'access to the Cr50 console')
-        if not self.cr50.servo_v4_supports_dts_mode():
+        if not self.cr50.servo_dts_mode_is_valid():
             raise error.TestNAError('Need working servo v4 DTS control')
-        self.dts_restore = self.servo.get('servo_v4_dts_mode')
+        self.dts_restore = self.servo.get_dts_mode()
 
         # Fast open cr50 and check if testlab is enabled.
         self.fast_open(enable_testlab=True)
@@ -119,7 +119,7 @@
         # Check if the dut has any RDD recognition issue.
         # First, cut off power source and hold the power button.
         #        disable RDD connection.
-        self.servo.set_servo_v4_dts_mode('off')
+        self.servo.set_dts_mode('off')
         self.servo.set('pwr_button', 'press')
         self.cr50_power_on_reset()
         try:
@@ -131,7 +131,7 @@
                 raise error.TestError('RDD leakage does not match capability'
                                       ' configuration.')
         finally:
-            self.servo.set_servo_v4_dts_mode(self.dts_restore)
+            self.servo.set_dts_mode(self.dts_restore)
             self.servo.set_nocheck('pwr_button', 'release')
             time.sleep(self.PD_SETTLE_TIME)
 
@@ -211,7 +211,7 @@
 
         try:
             # enable RDD Connection (or disable) before power-on-reset
-            self.servo.set_servo_v4_dts_mode('on' if rdd_enable else 'off')
+            self.servo.set_dts_mode('on' if rdd_enable else 'off')
 
             # Set power button before the dut loses power,
             # because the power button value cannot be changed during power-off.
@@ -246,7 +246,7 @@
             else:
                 self.servo.set_nocheck('servo_v4_role', 'src')
 
-            self.servo.set_servo_v4_dts_mode(self.dts_restore)
+            self.servo.set_dts_mode(self.dts_restore)
             time.sleep(1)
 
             # Press power button to wake up AP, and releases it soon
diff --git a/server/site_tests/firmware_Cr50DeviceState/firmware_Cr50DeviceState.py b/server/site_tests/firmware_Cr50DeviceState/firmware_Cr50DeviceState.py
index 9d46f0a..c3d7ed2 100644
--- a/server/site_tests/firmware_Cr50DeviceState/firmware_Cr50DeviceState.py
+++ b/server/site_tests/firmware_Cr50DeviceState/firmware_Cr50DeviceState.py
@@ -471,7 +471,7 @@
         self.all_errors = {}
         self.host = host
         self.is_arm = self.is_arm_family()
-        supports_dts_control = self.cr50.servo_v4_supports_dts_mode()
+        supports_dts_control = self.cr50.servo_dts_mode_is_valid()
 
         if supports_dts_control:
             self.cr50.ccd_disable(raise_error=True)
diff --git a/server/site_tests/firmware_Cr50OpenWhileAPOff/firmware_Cr50OpenWhileAPOff.py b/server/site_tests/firmware_Cr50OpenWhileAPOff/firmware_Cr50OpenWhileAPOff.py
index 7c593c2..2c5afe6 100644
--- a/server/site_tests/firmware_Cr50OpenWhileAPOff/firmware_Cr50OpenWhileAPOff.py
+++ b/server/site_tests/firmware_Cr50OpenWhileAPOff/firmware_Cr50OpenWhileAPOff.py
@@ -41,7 +41,7 @@
             'servo_v4_with_servo_micro'):
             raise error.TestNAError('Run using servo v4 with servo micro')
 
-        if not self.cr50.servo_v4_supports_dts_mode():
+        if not self.cr50.servo_dts_mode_is_valid():
             raise error.TestNAError('Plug in servo v4 type c cable into ccd '
                     'port')
 
@@ -163,7 +163,7 @@
 
     def set_dts(self, state):
         """Set servo v4 dts mode"""
-        self.servo.set_servo_v4_dts_mode(state)
+        self.servo.set_dts_mode(state)
         # Some boards can't detect DTS mode when the EC is off. After 0.X.18,
         # we can set CCD_MODE_L manually using gpioset. If detection is working,
         # this won't do anything. If it isn't working, it'll force cr50 to
diff --git a/server/site_tests/firmware_Cr50RddG3/firmware_Cr50RddG3.py b/server/site_tests/firmware_Cr50RddG3/firmware_Cr50RddG3.py
index aee03c9..fcf1022 100644
--- a/server/site_tests/firmware_Cr50RddG3/firmware_Cr50RddG3.py
+++ b/server/site_tests/firmware_Cr50RddG3/firmware_Cr50RddG3.py
@@ -27,11 +27,11 @@
             raise error.TestNAError('Leakage on the rdd signals breaks '
                                     'detection in G3')
 
-        self.servo.set_servo_v4_dts_mode('on')
+        self.servo.set_dts_mode('on')
         if not self.rdd_is_connected():
             raise error.TestNAError('Cr50 does not detect Rdd with dts mode on')
 
-        self.servo.set_servo_v4_dts_mode('off')
+        self.servo.set_dts_mode('off')
 
         if self.rdd_is_connected():
             raise error.TestFail('Cr50 detects Rdd with dts mode off')
@@ -43,7 +43,7 @@
         time.sleep(self.WAIT_FOR_STATE)
         rdd_connected = self.rdd_is_connected()
 
-        self.servo.set_servo_v4_dts_mode('on')
+        self.servo.set_dts_mode('on')
         self._try_to_bring_dut_up()
 
         if rdd_connected: