Abstract methods in ChameleonBoard and ChameleonPort
The abstraction of ChameleonBoard and ChameleonPort hide the underlying
XMLRPC connection and provide some helper methods.
BUG=chromium:336731
TEST=manual
Run display_Resolution test on Link. Still work fine.
Change-Id: Ic987949152e21995f11750ac62e27b51401f586b
Reviewed-on: https://chromium-review.googlesource.com/183436
Reviewed-by: Hung-ying Tyan <tyanh@chromium.org>
Commit-Queue: Wai-Hong Tam <waihong@chromium.org>
Tested-by: Wai-Hong Tam <waihong@chromium.org>
diff --git a/server/cros/chameleon/chameleon.py b/server/cros/chameleon/chameleon.py
new file mode 100644
index 0000000..7078fa6
--- /dev/null
+++ b/server/cros/chameleon/chameleon.py
@@ -0,0 +1,128 @@
+# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import xmlrpclib
+
+
+class ChameleonBoard(object):
+ """ChameleonBoard is an abstraction of a Chameleon board.
+
+ A Chameleond RPC proxy is passed to the construction such that it can
+ use this proxy to control the Chameleon board.
+ """
+
+ def __init__(self, chameleond_proxy):
+ """Construct a ChameleonBoard.
+
+ @param chameleond_proxy: Chameleond RPC proxy object.
+ """
+ self._chameleond_proxy = chameleond_proxy
+
+
+ def reset(self):
+ """Resets Chameleon board."""
+ self._chameleond_proxy.Reset()
+
+
+ def get_all_ports(self):
+ """Gets all the ports on Chameleon board which are connected.
+
+ @return: A list of ChameleonPort objects.
+ """
+ ports = self._chameleond_proxy.ProbeInputs()
+ return [ChameleonPort(self._chameleond_proxy, port) for port in ports]
+
+
+class ChameleonPort(object):
+ """ChameleonPort is an abstraction of a port of a Chameleon board.
+
+ A Chameleond RPC proxy and an input_id are passed to the construction.
+ The input_id is the unique identity to the port.
+ """
+
+ def __init__(self, chameleond_proxy, input_id):
+ """Construct a ChameleonPort.
+
+ @param chameleond_proxy: Chameleond RPC proxy object.
+ @param input_id: The ID of the input port.
+ """
+ self._chameleond_proxy = chameleond_proxy
+ self._input_id = input_id
+
+
+ def get_connector_id(self):
+ """Returns the connector ID.
+
+ @return: A number of connector ID.
+ """
+ return self._input_id
+
+
+ def get_connector_type(self):
+ """Returns the human readable string for the connector type.
+
+ @return: A string, like "VGA", "DVI", "HDMI", or "DP".
+ """
+ return self._chameleond_proxy.GetConnectorType(self._input_id)
+
+
+ def read_edid(self):
+ """Reads the EDID.
+
+ @return: A byte-array of EDID.
+ """
+ return self._chameleond_proxy.ReadEdid(self._input_id).data
+
+
+ def apply_edid(self, edid_data):
+ """Applies the given EDID.
+
+ @param edid_data: A byte-array of EDID.
+ """
+ edid_id = self._chameleond_proxy.CreateEdid(xmlrpclib.Binary(edid_data))
+ self._chameleond_proxy.ApplyEdid(self._input_id, edid_id)
+ self._chameleond_proxy.DestoryEdid(edid_id)
+
+
+ def plug(self):
+ """Asserts HPD line to high, emulating plug."""
+ self._chameleond_proxy.Plug(self._input_id)
+
+
+ def unplug(self):
+ """Deasserts HPD line to low, emulating unplug."""
+ self._chameleond_proxy.Unplug(self._input_id)
+
+
+ def fire_hpd_pulse(self, deassert_interval_usec, assert_interval_usec=None,
+ repeat_count=1):
+ """Fires a HPD pulse (high -> low -> high) or multiple HPD pulses.
+
+ @param deassert_interval_usec: The time of the deassert pulse.
+ @param assert_interval_usec: The time of the assert pulse.
+ @param repeat_count: The count of repeating the HPD pulses.
+ """
+ self._chameleond_proxy.FireHpdPulse(
+ self._input_id, deassert_interval_usec, assert_interval_usec,
+ repeat_count)
+
+
+ def capture_screen(self, file_path):
+ """Captures Chameleon framebuffer.
+
+ @param file_path: The path of file for output.
+
+ @return The byte-array for the screen.
+ """
+ pixels = self._chameleond_proxy.DumpPixels(self._input_id).data
+ open(file_path, 'w+').write(pixels)
+ return pixels
+
+
+ def get_resolution(self):
+ """Gets the source resolution.
+
+ @return: A (width, height) tuple.
+ """
+ return self._chameleond_proxy.DetectResolution(self._input_id)
diff --git a/server/hosts/chameleon_host.py b/server/hosts/chameleon_host.py
index 04457dc..10e451c 100644
--- a/server/hosts/chameleon_host.py
+++ b/server/hosts/chameleon_host.py
@@ -9,6 +9,7 @@
import xmlrpclib
from autotest_lib.client.bin import utils
+from autotest_lib.server.cros.chameleon import chameleon
from autotest_lib.server.hosts import ssh_host
@@ -63,16 +64,6 @@
return self._is_in_lab
- def get_chameleond_proxy(self):
- """Return a proxy that can be used to communicate with chameleond.
-
- @returns: An xmlrpclib.ServerProxy that is connected to the chameleond
- on the host.
-
- """
- return self._chameleond_proxy
-
-
def get_wait_up_processes(self):
"""Get the list of local processes to wait for in wait_up.
@@ -85,3 +76,9 @@
processes = [self.CHAMELEOND_PROCESS]
return processes
+
+ def create_chameleon_board(self):
+ """Create a ChameleonBoard object."""
+ # TODO(waihong): Add verify and repair logic which are required while
+ # deploying to Cros Lab.
+ return chameleon.ChameleonBoard(self._chameleond_proxy)
diff --git a/server/hosts/cros_host.py b/server/hosts/cros_host.py
index 65acc52..373e6ee 100644
--- a/server/hosts/cros_host.py
+++ b/server/hosts/cros_host.py
@@ -285,13 +285,8 @@
# a servo is required, i.e. when the servo_args is not None.
if servo_args is not None:
self.servo = self._servo_host.create_healthy_servo_object()
- # TODO(waihong): Create a wrapper for the Chameleond Proxy such that
- # we can hide the RPC specific logic, like wrapping the binary data
- # in xmlrpclib.Binary().
- # TODO(waihong): Add verify and repair logic which are required while
- # deploying to Cros Lab.
if chameleon_args is not None:
- self.chameleon = self._chameleon_host.get_chameleond_proxy()
+ self.chameleon = self._chameleon_host.create_chameleon_board()
def _create_chameleon_host(self, chameleon_args):
diff --git a/server/site_tests/display_Resolution/display_Resolution.py b/server/site_tests/display_Resolution/display_Resolution.py
index 3ce8f5a..634808d 100644
--- a/server/site_tests/display_Resolution/display_Resolution.py
+++ b/server/site_tests/display_Resolution/display_Resolution.py
@@ -7,7 +7,6 @@
import logging
import os
import time
-import xmlrpclib
from autotest_lib.client.common_lib import error
from autotest_lib.server import test
@@ -36,14 +35,14 @@
]
def initialize(self, host):
- self._connector_id = None
self._errors = []
self._host = host
self._test_data_dir = os.path.join(
self.bindir, 'display_Resolution_test_data')
self._display_client = display_client.DisplayClient(host)
self._display_client.initialize(self._test_data_dir)
- self._chameleon_board = host.chameleon
+ self._chameleon = host.chameleon
+ self._chameleon_port = None
def cleanup(self):
if self._display_client:
@@ -51,8 +50,8 @@
def run_once(self, host, usb_serial=None, test_mirrored=False,
test_suspend_resume=False):
- self._connector_id, self._connector_type = self.get_external_output()
- if self._connector_id is None:
+ self._chameleon_port = self.get_connected_port()
+ if self._chameleon_port is None:
raise error.TestError('DUT and Chameleon board not connected')
for resolution in self.RESOLUTION_TEST_LIST:
@@ -73,29 +72,26 @@
self.test_display(resolution)
finally:
- self._chameleon_board.Reset()
+ self._chameleon.reset()
if self._errors:
raise error.TestError(', '.join(self._errors))
- def get_external_output(self):
- """Gets the first available external output port between Chameleon
- and DUT.
+ def get_connected_port(self):
+ """Gets the first connected output port between Chameleon and DUT.
- @return: A tuple (the ID of Chameleon connector,
- the name of Chameleon connector)
+ @return: A ChameleonPort object.
"""
# TODO(waihong): Support multiple connectors.
- for connector_id in self._chameleon_board.ProbeInputs():
+ for chameleon_port in self._chameleon.get_all_ports():
# Plug to ensure the connector is plugged.
- self._chameleon_board.Plug(connector_id)
- connector_name = self._chameleon_board.GetConnectorType(
- connector_id)
+ chameleon_port.plug()
+ connector_type = chameleon_port.get_connector_type()
output = self._display_client.get_connector_name()
# TODO(waihong): Make sure eDP work in this way.
- if output and output.startswith(connector_name):
- return (connector_id, connector_name)
- return (None, None)
+ if output and output.startswith(connector_type):
+ return chameleon_port
+ return None
def set_up_chameleon(self, resolution):
"""Loads the EDID of the given resolution onto Chameleon.
@@ -104,34 +100,16 @@
resolution to test.
"""
logging.info('Setting up %r on port %d (%s)...',
- resolution, self._connector_id, self._connector_type)
-
+ resolution,
+ self._chameleon_port.get_connector_id(),
+ self._chameleon_port.get_connector_type())
edid_filename = os.path.join(
self._test_data_dir, 'edids', '%s_%dx%d' % resolution)
if not os.path.exists(edid_filename):
raise ValueError('EDID file %r does not exist' % edid_filename)
- logging.info('Create edid: %s', edid_filename)
- edid_id = self._chameleon_board.CreateEdid(
- xmlrpclib.Binary(open(edid_filename).read()))
-
- logging.info('Apply edid %d on port %d (%s)',
- edid_id, self._connector_id, self._connector_type)
- self._chameleon_board.ApplyEdid(self._connector_id, edid_id)
- self._chameleon_board.DestoryEdid(edid_id)
-
- # TODO(waihong): Move the frame capture method to Chameleon wrapper.
- def capture_chameleon_screen(self, file_path):
- """Captures Chameleon framebuffer.
-
- @param file_path: The path of file for output.
-
- @return: The byte-array for the screen.
- """
- pixels = self._chameleon_board.DumpPixels(self._connector_id).data
- # Write to file for debug.
- open(file_path, 'w+').write(pixels)
- return pixels
+ logging.info('Apply edid: %s', edid_filename)
+ self._chameleon_port.apply_edid(open(edid_filename).read())
def test_display(self, resolution):
"""Main display testing logic.
@@ -161,7 +139,7 @@
time.sleep(self.CALIBRATION_IMAGE_SETUP_TIME)
logging.info('Capturing framebuffer on Chameleon.')
- chameleon_pixels = self.capture_chameleon_screen(chameleon_path)
+ chameleon_pixels = self._chameleon_port.capture_screen(chameleon_path)
chameleon_pixels_len = len(chameleon_pixels)
logging.info('Capturing framebuffer on DUT.')