Abstract methods in ChameleonBoard and ChameleonPort

The abstraction of ChameleonBoard and ChameleonPort hide the underlying
XMLRPC connection and provide some helper methods.

BUG=chromium:336731
TEST=manual
Run display_Resolution test on Link. Still work fine.

Change-Id: Ic987949152e21995f11750ac62e27b51401f586b
Reviewed-on: https://chromium-review.googlesource.com/183436
Reviewed-by: Hung-ying Tyan <tyanh@chromium.org>
Commit-Queue: Wai-Hong Tam <waihong@chromium.org>
Tested-by: Wai-Hong Tam <waihong@chromium.org>
diff --git a/server/cros/chameleon/chameleon.py b/server/cros/chameleon/chameleon.py
new file mode 100644
index 0000000..7078fa6
--- /dev/null
+++ b/server/cros/chameleon/chameleon.py
@@ -0,0 +1,128 @@
+# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import xmlrpclib
+
+
+class ChameleonBoard(object):
+    """ChameleonBoard is an abstraction of a Chameleon board.
+
+    A Chameleond RPC proxy is passed to the construction such that it can
+    use this proxy to control the Chameleon board.
+    """
+
+    def __init__(self, chameleond_proxy):
+        """Construct a ChameleonBoard.
+
+        @param chameleond_proxy: Chameleond RPC proxy object.
+        """
+        self._chameleond_proxy = chameleond_proxy
+
+
+    def reset(self):
+        """Resets Chameleon board."""
+        self._chameleond_proxy.Reset()
+
+
+    def get_all_ports(self):
+        """Gets all the ports on Chameleon board which are connected.
+
+        @return: A list of ChameleonPort objects.
+        """
+        ports = self._chameleond_proxy.ProbeInputs()
+        return [ChameleonPort(self._chameleond_proxy, port) for port in ports]
+
+
+class ChameleonPort(object):
+    """ChameleonPort is an abstraction of a port of a Chameleon board.
+
+    A Chameleond RPC proxy and an input_id are passed to the construction.
+    The input_id is the unique identity to the port.
+    """
+
+    def __init__(self, chameleond_proxy, input_id):
+        """Construct a ChameleonPort.
+
+        @param chameleond_proxy: Chameleond RPC proxy object.
+        @param input_id: The ID of the input port.
+        """
+        self._chameleond_proxy = chameleond_proxy
+        self._input_id = input_id
+
+
+    def get_connector_id(self):
+        """Returns the connector ID.
+
+        @return: A number of connector ID.
+        """
+        return self._input_id
+
+
+    def get_connector_type(self):
+        """Returns the human readable string for the connector type.
+
+        @return: A string, like "VGA", "DVI", "HDMI", or "DP".
+        """
+        return self._chameleond_proxy.GetConnectorType(self._input_id)
+
+
+    def read_edid(self):
+        """Reads the EDID.
+
+        @return: A byte-array of EDID.
+        """
+        return self._chameleond_proxy.ReadEdid(self._input_id).data
+
+
+    def apply_edid(self, edid_data):
+        """Applies the given EDID.
+
+        @param edid_data: A byte-array of EDID.
+        """
+        edid_id = self._chameleond_proxy.CreateEdid(xmlrpclib.Binary(edid_data))
+        self._chameleond_proxy.ApplyEdid(self._input_id, edid_id)
+        self._chameleond_proxy.DestoryEdid(edid_id)
+
+
+    def plug(self):
+        """Asserts HPD line to high, emulating plug."""
+        self._chameleond_proxy.Plug(self._input_id)
+
+
+    def unplug(self):
+        """Deasserts HPD line to low, emulating unplug."""
+        self._chameleond_proxy.Unplug(self._input_id)
+
+
+    def fire_hpd_pulse(self, deassert_interval_usec, assert_interval_usec=None,
+                       repeat_count=1):
+        """Fires a HPD pulse (high -> low -> high) or multiple HPD pulses.
+
+        @param deassert_interval_usec: The time of the deassert pulse.
+        @param assert_interval_usec: The time of the assert pulse.
+        @param repeat_count: The count of repeating the HPD pulses.
+        """
+        self._chameleond_proxy.FireHpdPulse(
+                self._input_id, deassert_interval_usec, assert_interval_usec,
+                repeat_count)
+
+
+    def capture_screen(self, file_path):
+        """Captures Chameleon framebuffer.
+
+        @param file_path: The path of file for output.
+
+        @return The byte-array for the screen.
+        """
+        pixels = self._chameleond_proxy.DumpPixels(self._input_id).data
+        open(file_path, 'w+').write(pixels)
+        return pixels
+
+
+    def get_resolution(self):
+        """Gets the source resolution.
+
+        @return: A (width, height) tuple.
+        """
+        return self._chameleond_proxy.DetectResolution(self._input_id)
diff --git a/server/hosts/chameleon_host.py b/server/hosts/chameleon_host.py
index 04457dc..10e451c 100644
--- a/server/hosts/chameleon_host.py
+++ b/server/hosts/chameleon_host.py
@@ -9,6 +9,7 @@
 import xmlrpclib
 
 from autotest_lib.client.bin import utils
+from autotest_lib.server.cros.chameleon import chameleon
 from autotest_lib.server.hosts import ssh_host
 
 
@@ -63,16 +64,6 @@
         return self._is_in_lab
 
 
-    def get_chameleond_proxy(self):
-        """Return a proxy that can be used to communicate with chameleond.
-
-        @returns: An xmlrpclib.ServerProxy that is connected to the chameleond
-                  on the host.
-
-        """
-        return self._chameleond_proxy
-
-
     def get_wait_up_processes(self):
         """Get the list of local processes to wait for in wait_up.
 
@@ -85,3 +76,9 @@
         processes = [self.CHAMELEOND_PROCESS]
         return processes
 
+
+    def create_chameleon_board(self):
+        """Create a ChameleonBoard object."""
+        # TODO(waihong): Add verify and repair logic which are required while
+        # deploying to Cros Lab.
+        return chameleon.ChameleonBoard(self._chameleond_proxy)
diff --git a/server/hosts/cros_host.py b/server/hosts/cros_host.py
index 65acc52..373e6ee 100644
--- a/server/hosts/cros_host.py
+++ b/server/hosts/cros_host.py
@@ -285,13 +285,8 @@
         # a servo is required, i.e. when the servo_args is not None.
         if servo_args is not None:
             self.servo = self._servo_host.create_healthy_servo_object()
-        # TODO(waihong): Create a wrapper for the Chameleond Proxy such that
-        # we can hide the RPC specific logic, like wrapping the binary data
-        # in xmlrpclib.Binary().
-        # TODO(waihong): Add verify and repair logic which are required while
-        # deploying to Cros Lab.
         if chameleon_args is not None:
-            self.chameleon = self._chameleon_host.get_chameleond_proxy()
+            self.chameleon = self._chameleon_host.create_chameleon_board()
 
 
     def _create_chameleon_host(self, chameleon_args):
diff --git a/server/site_tests/display_Resolution/display_Resolution.py b/server/site_tests/display_Resolution/display_Resolution.py
index 3ce8f5a..634808d 100644
--- a/server/site_tests/display_Resolution/display_Resolution.py
+++ b/server/site_tests/display_Resolution/display_Resolution.py
@@ -7,7 +7,6 @@
 import logging
 import os
 import time
-import xmlrpclib
 
 from autotest_lib.client.common_lib import error
 from autotest_lib.server import test
@@ -36,14 +35,14 @@
     ]
 
     def initialize(self, host):
-        self._connector_id = None
         self._errors = []
         self._host = host
         self._test_data_dir = os.path.join(
                 self.bindir, 'display_Resolution_test_data')
         self._display_client = display_client.DisplayClient(host)
         self._display_client.initialize(self._test_data_dir)
-        self._chameleon_board = host.chameleon
+        self._chameleon = host.chameleon
+        self._chameleon_port = None
 
     def cleanup(self):
         if self._display_client:
@@ -51,8 +50,8 @@
 
     def run_once(self, host, usb_serial=None, test_mirrored=False,
                  test_suspend_resume=False):
-        self._connector_id, self._connector_type = self.get_external_output()
-        if self._connector_id is None:
+        self._chameleon_port = self.get_connected_port()
+        if self._chameleon_port is None:
             raise error.TestError('DUT and Chameleon board not connected')
 
         for resolution in self.RESOLUTION_TEST_LIST:
@@ -73,29 +72,26 @@
 
                 self.test_display(resolution)
             finally:
-                self._chameleon_board.Reset()
+                self._chameleon.reset()
 
         if self._errors:
             raise error.TestError(', '.join(self._errors))
 
-    def get_external_output(self):
-        """Gets the first available external output port between Chameleon
-        and DUT.
+    def get_connected_port(self):
+        """Gets the first connected output port between Chameleon and DUT.
 
-        @return: A tuple (the ID of Chameleon connector,
-                          the name of Chameleon connector)
+        @return: A ChameleonPort object.
         """
         # TODO(waihong): Support multiple connectors.
-        for connector_id in self._chameleon_board.ProbeInputs():
+        for chameleon_port in self._chameleon.get_all_ports():
             # Plug to ensure the connector is plugged.
-            self._chameleon_board.Plug(connector_id)
-            connector_name = self._chameleon_board.GetConnectorType(
-                    connector_id)
+            chameleon_port.plug()
+            connector_type = chameleon_port.get_connector_type()
             output = self._display_client.get_connector_name()
             # TODO(waihong): Make sure eDP work in this way.
-            if output and output.startswith(connector_name):
-                return (connector_id, connector_name)
-        return (None, None)
+            if output and output.startswith(connector_type):
+                return chameleon_port
+        return None
 
     def set_up_chameleon(self, resolution):
         """Loads the EDID of the given resolution onto Chameleon.
@@ -104,34 +100,16 @@
                 resolution to test.
         """
         logging.info('Setting up %r on port %d (%s)...',
-                     resolution, self._connector_id, self._connector_type)
-
+                     resolution,
+                     self._chameleon_port.get_connector_id(),
+                     self._chameleon_port.get_connector_type())
         edid_filename = os.path.join(
                 self._test_data_dir, 'edids', '%s_%dx%d' % resolution)
         if not os.path.exists(edid_filename):
             raise ValueError('EDID file %r does not exist' % edid_filename)
 
-        logging.info('Create edid: %s', edid_filename)
-        edid_id = self._chameleon_board.CreateEdid(
-                xmlrpclib.Binary(open(edid_filename).read()))
-
-        logging.info('Apply edid %d on port %d (%s)',
-                     edid_id, self._connector_id, self._connector_type)
-        self._chameleon_board.ApplyEdid(self._connector_id, edid_id)
-        self._chameleon_board.DestoryEdid(edid_id)
-
-    # TODO(waihong): Move the frame capture method to Chameleon wrapper.
-    def capture_chameleon_screen(self, file_path):
-        """Captures Chameleon framebuffer.
-
-        @param file_path: The path of file for output.
-
-        @return: The byte-array for the screen.
-        """
-        pixels = self._chameleon_board.DumpPixels(self._connector_id).data
-        # Write to file for debug.
-        open(file_path, 'w+').write(pixels)
-        return pixels
+        logging.info('Apply edid: %s', edid_filename)
+        self._chameleon_port.apply_edid(open(edid_filename).read())
 
     def test_display(self, resolution):
         """Main display testing logic.
@@ -161,7 +139,7 @@
         time.sleep(self.CALIBRATION_IMAGE_SETUP_TIME)
 
         logging.info('Capturing framebuffer on Chameleon.')
-        chameleon_pixels = self.capture_chameleon_screen(chameleon_path)
+        chameleon_pixels = self._chameleon_port.capture_screen(chameleon_path)
         chameleon_pixels_len = len(chameleon_pixels)
 
         logging.info('Capturing framebuffer on DUT.')