Implement CEC feature autotest.
First step is to make sure fizz side works.
BUG=b:79406482
TEST=None
Change-Id: I1db702dfeb5e1ef8ff34839340a9586a9b1476f9
Reviewed-on: https://chromium-review.googlesource.com/1175412
Reviewed-by: Zhongze Hu <frankhu@google.com>
Tested-by: Liang Li <dianwa@chromium.org>
diff --git a/server/site_tests/enterprise_CFM_CEC/chameleon_cecservice/cec_service b/server/site_tests/enterprise_CFM_CEC/chameleon_cecservice/cec_service
new file mode 100755
index 0000000..0403f51
--- /dev/null
+++ b/server/site_tests/enterprise_CFM_CEC/chameleon_cecservice/cec_service
@@ -0,0 +1,73 @@
+#!/usr/bin/python
+
+import sys
+import time
+from time import sleep
+import util
+import it6803
+
+usage = """\
+Usage:
+ cec_service -- print command usage
+ cec_service start -- run cec service
+"""
+
+powerOn = True
+
+def main(cmdline):
+ args = [''] * 4
+ for i, x in enumerate(cmdline):
+ args[i] = x
+ cmd = args[1]
+
+ if cmd == '': cmd = 'help'
+ fname = 'cmd_' + cmd
+
+ if fname in globals():
+ if args[2] == '':
+ globals()[fname]()
+ else:
+ globals()[fname](args[2])
+ else:
+ print 'Unknown command', cmd
+
+def cmd_start():
+ it6803.cec_open()
+ it6803.cec_init()
+ while True:
+ code = it6803.cec_msg_receive()
+ if code is not None:
+ if code == 0x36:
+ handle_standBy()
+ elif code == 0x04:
+ handle_imageOn()
+ elif code == 0x8F:
+ handle_powerStatus()
+ else:
+ print 'Unknow command'
+ it6803.cec_close()
+ return
+
+def handle_standBy():
+ global powerOn
+ powerOn = False
+ return
+
+def handle_imageOn():
+ global powerOn
+ powerOn = True
+ return
+
+def handle_powerStatus():
+ global powerOn
+ print 'power status: {}'.format(powerOn)
+ if powerOn:
+ it6803.cec_msg_poweron()
+ else:
+ it6803.cec_msg_poweroff()
+ it6803.cec_transmit()
+ return
+
+
+if __name__ == '__main__':
+ main(sys.argv)
diff --git a/server/site_tests/enterprise_CFM_CEC/chameleon_cecservice/it6803.py b/server/site_tests/enterprise_CFM_CEC/chameleon_cecservice/it6803.py
new file mode 100755
index 0000000..00f8fa6
--- /dev/null
+++ b/server/site_tests/enterprise_CFM_CEC/chameleon_cecservice/it6803.py
@@ -0,0 +1,285 @@
+#!/usr/bin/python
+
+# Copyright 2014 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+# Reference[1]: IT680x example code:
+# https://drive.google.com/corp/drive/u/0/folders/0B8Lcp5hqbjaqaE5WdDA5alVWOXc
+
+# Reference[2]: IT6803 Programming Guide:
+# https://docs.google.com/viewer?a=v&pid=sites&srcid=\
+# Y2hyb21pdW0ub3JnfGRldnxneDoyNGVmNGFiMDE4ZWJiZDM2
+
+# This code is a library for using IT680X chip in chameleon.
+
+import sys
+import util
+from time import sleep
+
+usage = """\
+Usage:
+ it6803 -- print command usage
+ it6803 cec_reg_print -- print all cec registers value
+ it6803 cec_msg_receive -- print receiving cec message
+ it6803 cec_msg {cmd} -- send cec message
+"""
+
+QUEUE_SIZE = 3
+q_head = 0
+q_tail = 0
+regTxOutState = 3
+
+logicalAddr = 0
+initiatorAddr = 0x0F
+cecTxState = 0
+
+txCmdBuf = [0x00] * 19
+rxCecBuf = [0x00] * 19
+queue = [[0x00 for i in range(19)] for j in range(QUEUE_SIZE)]
+
+# Chameleon register address
+I2C_HDMI = 0x48
+I2C_CEC = 0x4A
+
+# Chameleon CEC control registers
+# (name starts with REG is register addr, followings are values for this reg)
+REG06 = 0x06
+REG_EMPTY = 0x00
+REG07 = 0x07
+ENABLE_CEC_INTERRUPT_PIN = 0x40
+
+REG08 = 0x08
+FIRE_FRAME = 0x80
+DEBUG_CEC_CLEAR = 0x40
+CEC_SCHMITT_TRIGGER = 0x08
+CEC_INTERRUPT = 0x01
+
+REG09 = 0x09
+REGION_SELECT = 0x40
+INITAITOR_RX_CEC = 0x20
+ACKNOWLEDGE = 0x01
+
+REG_MIN_BIT = 0x0B
+REG_TIME_UNIT = 0x0C
+
+REG0F = 0x0F
+IO_PULL_UP = 0x50
+
+REG_TARG_ADDR = 0x22
+REG_MSCOUNT_L = 0x45
+REG_MSCOUNT_M = 0x46
+REG_MSCOUNT_H = 0x47
+REF_INT_STATUS= 0x4C
+
+def main(cmdline):
+ """ Main function. """
+ args = [''] * 4
+ for i, x in enumerate(cmdline):
+ args[i] = x
+ cmd = args[1]
+
+ if cmd == '': cmd = 'help'
+ fname = 'cmd_' + cmd
+
+ cec_open()
+ if fname in globals():
+ if args[2] == '':
+ globals()[fname]()
+ else:
+ globals()[fname](args[2])
+ else:
+ print 'Unknown command', cmd
+ cec_close()
+
+
+def cmd_help():
+ """ Print help message. """
+ print usage
+
+
+def cec_open():
+ """ Enable cec port. """
+ # enable IT6803 CEC port: enable cec clock and assign slave addr
+ i2cset(I2C_HDMI, 0x0E, 0xFF)
+ i2cset(I2C_HDMI, 0x86, 0x95)
+
+def cec_close():
+ """ Close cec port. """
+ # disable cec slave addr
+ i2cset(I2C_HDMI, 0x86, 0x94)
+
+
+def cec_init():
+ """ Initialize cec port in chameleon. """
+ # initial CEC register. From reference[1] Ln480
+
+ # enable it680x cec
+ i2cset(I2C_CEC, 0xF8, 0xC3)
+ i2cset(I2C_CEC, 0xF8, 0xA5)
+ q_head = 0
+ q_tail = 0
+ regTxOutState = 3
+
+ # get 100ms timer, according to ref [1,2]
+ i2cset(I2C_CEC, REG09, ACKNOWLEDGE)
+ sleep(0.099)
+ i2cset(I2C_CEC, REG09, REG_EMPTY)
+ high = util.i2c_read(0, I2C_CEC, REG_MSCOUNT_H, 1)[0] * 0x10000
+ mid = util.i2c_read(0, I2C_CEC, REG_MSCOUNT_M, 1)[0] * 0x100
+ low = util.i2c_read(0, I2C_CEC, REG_MSCOUNT_L, 1)[0]
+ tus = (high + mid + low) / 1000
+ # print tus
+
+ # CEC configuration
+ i2cset(I2C_CEC, REG09, INITAITOR_RX_CEC | REGION_SELECT)
+ i2cset(I2C_CEC, REG_MIN_BIT, 0x14)
+ i2cset(I2C_CEC, REG_TIME_UNIT, tus)
+ i2cset(I2C_CEC, REG_TARG_ADDR, logicalAddr)
+ i2cset(I2C_CEC, REG08, CEC_SCHMITT_TRIGGER)
+ uc = util.i2c_read(0, I2C_CEC, REG09, 1)[0]
+ # i2cset(I2C_CEC, REG09, uc|0x02)
+ # cec_clr_int
+ i2cset(I2C_CEC, REG08, CEC_INTERRUPT|DEBUG_CEC_CLEAR|CEC_SCHMITT_TRIGGER)
+ i2cset(I2C_CEC, REG08, CEC_SCHMITT_TRIGGER|DEBUG_CEC_CLEAR)
+ # print 'logicalAddr: {}, TimeUnit: {}'.format(logicalAddr,tus)
+
+ # Enable CEC interrupt pin
+ reg07_val = util.i2c_read(0, I2C_CEC, REG07, 1)[0]
+ i2cset(I2C_CEC, REG07, reg07_val | ENABLE_CEC_INTERRUPT_PIN)
+
+ # Enable ALL interrupt mask
+ i2cset(I2C_CEC, REG06, REG_EMPTY)
+
+ # IO pull up enable
+ i2cset(I2C_CEC, REG0F, IO_PULL_UP)
+
+def cec_msg_receive():
+ """ Read message received. """
+ # 0x3F means all interrupts are on
+ cecInt = cec_reg_read(REF_INT_STATUS) & 0x3F
+ if 0 != (cecInt & 0x10):
+ if not cec_msg_read():
+ raise Exception('Queue is full!')
+ ## TODO check interrupt register Status
+ i2c_cec_set(REF_INT_STATUS, cecInt)
+ # Decode received message
+ return cec_decode()
+
+
+def cmd_cec_msg(message):
+ """ parent function for a cec message. """
+ cec_init()
+ fname = 'cec_msg_' + message
+ globals()[fname]()
+ cec_transmit()
+
+def cec_msg_standby():
+ """ Send a stand by message. """
+ # F = boardcast, 0x36 = stand by message
+ cec_cmd_set(0xF, 0x36, None, None)
+ # other operations need more assignments
+
+def cec_msg_viewon():
+ """ Send a view on message. """
+ # 0 = TV, 0x04 = image on
+ cec_cmd_set(0x0, 0x04, None, None)
+
+def cec_msg_poweron():
+ """ Make a power on cec message. """
+ global initiatorAddr
+ # 0x90 = power status message
+ cec_cmd_set(initiatorAddr, 0x90, 0x00, None)
+
+def cec_msg_poweroff():
+ """ Make a power off cec message. """
+ global initiatorAddr
+ # 0x90 = power status message
+ cec_cmd_set(initiatorAddr, 0x90, 0x01, None)
+
+def cec_reg_read(offset):
+ """ read it6803's register value from i2c line. """
+ return util.i2c_read(0, I2C_CEC, offset, 1)[0]
+
+def cec_cmd_set(follower, txCmd, operand1, operand2):
+ """ Compose a cec message. """
+ # print 'follower: {}, cmd: {}'.format(follower, txCmd)
+ # TODO set variables
+ txCmdBuf[0] = 2
+ txCmdBuf[1] = (logicalAddr<<4) + follower
+ txCmdBuf[2] = txCmd
+ txCmdBuf[3] = 0
+ txCmdBuf[4] = 0
+ if operand1 is not None:
+ txCmdBuf[3] = operand1
+ txCmdBuf[0] = 3
+ if operand2 is not None:
+ txCmdBuf[4] = operand2
+ txCmdBuf[0] = 4
+ # print txCmdBuf
+ return
+
+def cec_transmit():
+ """ File a cec message out. """
+ # Assume the state is cecTransfer
+ # Set values from 0x10 to 0x23
+ i2c_cec_set(0x23, txCmdBuf[0])
+ for i in range (0, txCmdBuf[0]):
+ i2c_cec_set(0x10+i, txCmdBuf[i+1])
+
+ # Fire command
+ i2c_cec_set(REG08, FIRE_FRAME | CEC_SCHMITT_TRIGGER | DEBUG_CEC_CLEAR)
+ i2c_cec_set(REG08, CEC_SCHMITT_TRIGGER | DEBUG_CEC_CLEAR)
+ return
+
+def cec_msg_read():
+ """ Read incoming cec messages from memory. """
+ global q_head, q_tail
+ if (q_head % QUEUE_SIZE) != (q_tail % QUEUE_SIZE):
+ return False
+ q_tail += 1
+ i = q_tail % QUEUE_SIZE
+ # 0x30 is starting point for receiving message
+ data = util.i2c_read(0, I2C_CEC, 0x30, 19)
+ for j in range(1, 19):
+ queue[i][j] = data[j-1]
+ queue[i][0] = data[18]
+ return True
+
+def cec_decode():
+ """ Process incoming cec message. """
+ global q_head, q_tail, initiatorAddr
+ if (q_head % QUEUE_SIZE) == (q_tail % QUEUE_SIZE):
+ # Queue is empty
+ return
+ q_head += 1
+ rxCecBuf = queue[q_head % QUEUE_SIZE]
+ #print rxCecBuf
+
+ if (rxCecBuf[0] == 1):
+ if logicalAddr == (rxCecBuf[1] & 0x0F):
+ # eReportPhysicalAddress
+ return
+ # Validate message
+ initiatorAddr = (rxCecBuf[1] >> 4) & 0x0F
+ followerAddr = rxCecBuf[1] & 0x0F
+ print 'Initiator: {} Follower: {}'.format(initiatorAddr, followerAddr)
+
+ if (rxCecBuf[2] == 0x04):
+ print 'received image-view-on'
+ elif (rxCecBuf[2] == 0x36):
+ print 'received standby'
+ else:
+ print 'other command: {}'.format(rxCecBuf[2])
+ return rxCecBuf[2]
+
+def i2cset(addr, offset, value):
+ """ set some register value via i2c line. """
+ util.i2c_write(0, addr, offset, [value])
+
+def i2c_cec_set(offset, value):
+ """ set it6803's register value via i2c line. """
+ i2cset(I2C_CEC, offset, value)
+
+if __name__ == '__main__':
+ main(sys.argv)
diff --git a/server/site_tests/enterprise_CFM_CEC/control b/server/site_tests/enterprise_CFM_CEC/control
new file mode 100644
index 0000000..8d61fff
--- /dev/null
+++ b/server/site_tests/enterprise_CFM_CEC/control
@@ -0,0 +1,29 @@
+# Copyright 2018 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+from autotest_lib.server import utils
+
+AUTHOR = "dianwa@google.com"
+NAME = "enterprise_CFM_CEC"
+TIME = "SHORT"
+TEST_CATEGORY = "Functional"
+TEST_CLASS = "enterprise"
+TEST_TYPE = "server"
+
+JOB_RETRIES = 0
+DEPENDENCIES = ""
+
+DOC = """
+This test performs the CEC feature in HDMI cable in Teemo
+"""
+
+args_dict = utils.args_to_dict(args)
+chameleon_args = hosts.CrosHost.get_chameleon_arguments(args_dict)
+
+def run_test(machine):
+ host = hosts.create_host(machine, chameleon_args=chameleon_args)
+ job.run_test(NAME, host=host)
+
+
+parallel_simple(run_test, machines)
diff --git a/server/site_tests/enterprise_CFM_CEC/enterprise_CFM_CEC.py b/server/site_tests/enterprise_CFM_CEC/enterprise_CFM_CEC.py
new file mode 100644
index 0000000..d8d691c
--- /dev/null
+++ b/server/site_tests/enterprise_CFM_CEC/enterprise_CFM_CEC.py
@@ -0,0 +1,226 @@
+# Copyright 2018 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LISENCE file.
+
+import logging
+import re
+import os
+import time
+
+from autotest_lib.client.common_lib import error
+from autotest_lib.server import test
+
+CEC_COMMAND = "sudo -u chronos dbus-send --system --print-reply \
+--type=method_call --dest=org.chromium.CecService \
+/org/chromium/CecService org.chromium.CecService."
+CEC_CMD_STANDBY = "SendStandByToAllDevices"
+CEC_CMD_IMAGEON = "SendWakeUpToAllDevices"
+CEC_CMD_DISP_STATUS = "GetTvsPowerStatus"
+
+CHAMELEON_ROOT = '/home/root/'
+
+STATUS_ERROR = 0
+STATUS_ON = 3
+STATUS_OFF = 4
+STATUS_TO_ON = 5
+STATUS_TO_OFF = 6
+
+class enterprise_CFM_CEC(test.test):
+ """
+ Test CEC feature for display power control.
+ """
+
+ version = 1
+ # TODO: how to get whether it connects to chameleon board
+ chameleon_mode = True
+
+ def initialize(self):
+ """ initialize is a stub function."""
+ pass
+
+ # Simply run a command in machine and return result messages.
+ def _shcmd(self, cmd):
+ """ A simple wrapper for remote shell command execution.
+ @param cmd: shell command for Fizz
+ """
+ logging.info('CMD: [%s]', cmd)
+ try:
+ result = self._client.run(cmd)
+ if result is None:
+ return result
+ if result.stderr:
+ logging.info('CMD ERR:\n' + result.stderr)
+ logging.info('CMD OUT:\n' + result.stdout)
+ return result
+ except Exception as e:
+ logging.info('run command failed. ' + str(e))
+
+ def run_once(self, host=None):
+ """
+ Test scenario:
+
+ If system does not support cec port, we simply throw an exception.
+
+ Generally we are using the build-in cecservice for this test. This
+ service supports multiple features including turning TV(display)
+ power on/off and monitor its power status.
+
+ Following is test plan:
+ 0.0 Copy two python files to chameleon
+ 0.1 enable chameleon as hdmi mode
+ 0.2 run cec service on chameleon
+
+ 0.3 Make sure chrome box is running cecservice
+ 0.4 Make sure chrome box's /dev/cecX is open
+ 0.5 Run TV power status to check configuration correct
+ (end of step 0)
+
+
+ 1.0 Send TV turn-off command
+ 1.1 Check TV power status to check whether it is off.
+ (end of step 1)
+
+ 2.0 Send TV turn-on command
+ 2.1 Check TV power status to check whether it is on.
+ (end of step 2)
+ At every end of step, we have to decide whether we stop this test
+ right now or continue next ones.
+ Note that we could turn on TV first and then turn off it,
+ according to its initial power status.
+
+ 3.0 Stop cec service on chameleon
+ 3.1 Remove python files from chameleon
+
+ @param host: host object machine.
+ """
+
+ self._client = host
+ self.chameleon = host.chameleon
+
+ ## TODO check same port
+ #Step 0.0 - 0.2
+ self.copy_cecservice()
+ self.cec_service_init()
+
+
+ # Step 0.3 - 0.5
+ if not self.is_cecservice_running():
+ self.cec_cleanup()
+ raise error.TestFail("CEC service is not running.")
+ if not self.is_cec_available():
+ self.cec_cleanup()
+ raise error.TestFail("/dev/cecX port is not open.")
+ status = self.check_display_status()
+ if STATUS_ERROR == status:
+ self.cec_cleanup()
+ raise error.TestFail("CEC communication is not good.")
+
+ # Step 1 & 2
+ if STATUS_ON == status:
+ self.test_turn_off()
+ if not self.chameleon_mode:
+ time.sleep(5)
+ self.test_turn_on()
+ else:
+ self.test_turn_on()
+ if not self.chameleon_mode:
+ time.sleep(5)
+ self.test_turn_off()
+
+ # Step 3
+ self.cec_cleanup()
+
+ # Chameleon
+ def copy_cecservice(self):
+ """ copy python files under ./chameleon_cecservice to chameleon.
+ In that folder, we have two files for cecservice.
+ """
+ current_dir = os.path.dirname(os.path.realpath(__file__))
+ base_dir = current_dir + '/chameleon_cecservice/'
+ self.chameleon.host.send_file(base_dir + 'cec_service', CHAMELEON_ROOT)
+ self.chameleon.host.send_file(base_dir + 'it6803.py', CHAMELEON_ROOT)
+
+ def cec_service_init(self):
+ """ Setup chameleon board as a hdmi mode.
+ Run cec service on chameleon
+ """
+ self.chameleon.host.run('/home/root/setup hdmi')
+ self.chameleon.host.run('(/home/root/cec_service start) '\
+ '</dev/null >/dev/null 2>&1 & echo -n $!')
+
+ def cec_cleanup(self):
+ """ Stop cec service on chameleon.
+ Delete files new coming on chameleon.
+ """
+ if self.chameleon_mode:
+ stop_cmd = 'kill $(ps | grep \'cec_service\' | awk \'{print $1}\')'
+ self.chameleon.host.run(stop_cmd)
+ cleanup_cmd = 'rm /home/root/cec_service /home/root/it6803*'
+ self.chameleon.host.run(cleanup_cmd)
+
+ # Fizz
+ def is_cecservice_running(self):
+ """ try to confirm that current box is running cecservice.
+ @return: whether cecservice is running in Fizz.
+ """
+ cmd = 'initctl list | grep cecservice'
+ result = self._shcmd(cmd)
+ if result is None:
+ return False;
+ if "running" not in result.stdout:
+ return False;
+ return True;
+
+ def is_cec_available(self):
+ """ try to get whether the system makes /dev/cecX open.
+ @return: whether /dev/cecX is open in Fizz
+ """
+ cmd = "ls /dev/cec*"
+ result = self._shcmd(cmd)
+ if result is None:
+ return False;
+ return True;
+
+ def check_display_status(self):
+ """ try to confirm that current box connected to a display
+ which supports cec feature.
+ @return: current display power status
+ """
+ cmd = CEC_COMMAND + CEC_CMD_DISP_STATUS
+ result = self._shcmd(cmd).stdout
+ status = re.findall('int32 \\d', result)
+ for s in status:
+ code = int(s[-1]);
+ if code == STATUS_ON or code == STATUS_TO_ON:
+ return STATUS_ON
+ if code == STATUS_OFF or code == STATUS_TO_OFF:
+ return STATUS_OFF
+ return STATUS_ERROR
+
+ def display_on(self):
+ """ send a power turn on cec message """
+ self._shcmd(CEC_COMMAND + CEC_CMD_IMAGEON)
+
+ def display_off(self):
+ """ send a power turn off cec message"""
+ self._shcmd(CEC_COMMAND + CEC_CMD_STANDBY)
+
+ def test_turn_on(self):
+ """ test sending turn_on cec message process. """
+ self.display_on()
+ if not self.chameleon_mode:
+ time.sleep(10)
+ status = self.check_display_status()
+ if STATUS_ON != status:
+ self.cec_cleanup()
+ raise error.TestFail("CEC display image on does not work.")
+
+ def test_turn_off(self):
+ """ test sending turn_off cec message process. """
+ self.display_off()
+ if not self.chameleon_mode:
+ time.sleep(1)
+ status = self.check_display_status()
+ if STATUS_OFF != status:
+ self.cec_cleanup()
+ raise error.TestFail("CEC display standby does not work.")