faft: Refactor Chrome EC related methods into a class
So far the Chrome EC related methods are quite distributed across tests.
We should have a single place to control all the Chrome EC functions.
This CL does the first step to refactor the UART command methods into
the ChromeEC class.
BUG=chromium-os:35254
TEST=run_remote_tests.sh --board link --remote dut suite:faft_ec
Change-Id: If148170bcf6801a2a33f666b8a5a7fadc47f3243
Reviewed-on: https://gerrit.chromium.org/gerrit/35403
Reviewed-by: Vic Yang <victoryang@chromium.org>
Reviewed-by: Mike Truty <truty@chromium.org>
Commit-Ready: Tom Wai-Hong Tam <waihong@chromium.org>
Tested-by: Tom Wai-Hong Tam <waihong@chromium.org>
diff --git a/server/cros/chrome_ec.py b/server/cros/chrome_ec.py
new file mode 100644
index 0000000..4095150
--- /dev/null
+++ b/server/cros/chrome_ec.py
@@ -0,0 +1,71 @@
+# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import ast
+
+from autotest_lib.client.common_lib import error
+
+class ChromeEC(object):
+ """Manages control of a Chrome EC.
+
+ We control the Chrome EC via the UART of a Servo board. Chrome EC
+ provides many interfaces to set and get its behavior via console commands.
+ This class is to abstract these interfaces.
+ """
+
+ def __init__(self, servo):
+ """Initialize and keep the servo object.
+
+ Args:
+ servo: A Servo object.
+ """
+ self._servo = servo
+
+
+ def send_command(self, command):
+ """Send command through UART.
+
+ This function opens UART pty when called, and then command is sent
+ through UART.
+
+ Args:
+ command: The command string to send.
+ """
+ self._servo.set('ec_uart_regexp', 'None')
+ self._servo.set_nocheck('ec_uart_cmd', command)
+
+
+ def send_command_get_output(self, command, regexp_list, timeout=1):
+ """Send command through UART and wait for response.
+
+ This function waits for response message matching regular expressions.
+
+ Args:
+ command: The command sent.
+ regexp_list: List of regular expressions used to match response
+ message. Note, list must be ordered.
+
+ Returns:
+ List of tuples, each of which contains the entire matched string and
+ all the subgroups of the match. None if not matched.
+ For example:
+ response of the given command:
+ High temp: 37.2
+ Low temp: 36.4
+ regexp_list:
+ ['High temp: (\d+)\.(\d+)', 'Low temp: (\d+)\.(\d+)']
+ returns:
+ [('High temp: 37.2', '37', '2'), ('Low temp: 36.4', '36', '4')]
+
+ Raises:
+ error.TestError: An error when the given regexp_list is not valid.
+ """
+ if not isinstance(regexp_list, list):
+ raise error.TestError('Arugment regexp_list is not a list: %s' %
+ str(regexp_list))
+
+ self._servo.set('ec_uart_timeout', str(float(timeout)))
+ self._servo.set('ec_uart_regexp', str(regexp_list))
+ self._servo.set_nocheck('ec_uart_cmd', command)
+ return ast.literal_eval(self._servo.get('ec_uart_cmd'))
diff --git a/server/cros/faftsequence.py b/server/cros/faftsequence.py
index c08c5ad..84a4ab8 100644
--- a/server/cros/faftsequence.py
+++ b/server/cros/faftsequence.py
@@ -2,7 +2,6 @@
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
-import ast
import ctypes
import logging
import os
@@ -10,11 +9,11 @@
import sys
import tempfile
import time
-import xmlrpclib
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import error
from autotest_lib.server.cros import vboot_constants as vboot
+from autotest_lib.server.cros.chrome_ec import ChromeEC
from autotest_lib.server.cros.faft_client_attribute import FAFTClientAttribute
from autotest_lib.server.cros.servo_test import ServoTest
from autotest_lib.site_utils import lab_test
@@ -177,6 +176,9 @@
self.client_attr = FAFTClientAttribute(
self.faft_client.get_platform_name())
+ if self.client_attr.chrome_ec:
+ self.ec = ChromeEC(self.servo)
+
# Setting up key matrix mapping
self.servo.set_key_matrix(self.client_attr.key_matrix_layout)
@@ -409,54 +411,6 @@
})
- def send_uart_command(self, command):
- """Send command through UART.
-
- This function open UART pty when called, and then command is sent
- through UART.
-
- Args:
- command: The command string to send.
- """
- self.servo.set('ec_uart_regexp', 'None')
- self.servo.set_nocheck('ec_uart_cmd', command)
-
-
- def send_uart_command_get_output(self, command, regexp_list, timeout=1):
- """Send command through UART and wait for response.
-
- This function waits for response message matching regular expressions.
-
- Args:
- command: The command sent.
- regexp_list: List of regular expressions used to match response
- message. Note, list must be ordered.
-
- Returns:
- List of tuples, each of which contains the entire matched string and
- all the subgroups of the match. None if not matched.
- For example:
- response of the given command:
- High temp: 37.2
- Low temp: 36.4
- regexp_list:
- ['High temp: (\d+)\.(\d+)', 'Low temp: (\d+)\.(\d+)']
- returns:
- [('High temp: 37.2', '37', '2'), ('Low temp: 36.4', '36', '4')]
-
- Raises:
- error.TestError: An error when the given regexp_list is not valid.
- """
- if not isinstance(regexp_list, list):
- raise error.TestError('Arugment regexp_list is not a list: %s' %
- str(regexp_list))
-
- self.servo.set('ec_uart_timeout', str(float(timeout)))
- self.servo.set('ec_uart_regexp', str(regexp_list))
- self.servo.set_nocheck('ec_uart_cmd', command)
- return ast.literal_eval(self.servo.get('ec_uart_cmd'))
-
-
def check_ec_capability(self, required_cap=[], suppress_warning=False):
"""Check if current platform has required EC capabilities.
@@ -754,13 +708,13 @@
self.set_hardware_write_protect(enabled)
if enabled:
# Set write protect flag and reboot to take effect.
- self.send_uart_command("flashwp enable")
+ self.ec.send_command("flashwp enable")
self.sync_and_ec_reboot()
else:
# Reboot after deasserting hardware write protect pin to deactivate
# write protect. And then remove software write protect flag.
self.sync_and_ec_reboot()
- self.send_uart_command("flashwp disable")
+ self.ec.send_command("flashwp disable")
def send_ctrl_d_to_dut(self):
@@ -888,9 +842,9 @@
time.sleep(self.COLD_RESET_DELAY)
self.servo.set('cold_reset', 'off')
time.sleep(self.EC_BOOT_DELAY)
- self.send_uart_command("reboot ap-off")
+ self.ec.send_command("reboot ap-off")
time.sleep(self.EC_BOOT_DELAY)
- self.send_uart_command("hostevent set 0x4000")
+ self.ec.send_command("hostevent set 0x4000")
self.servo.power_short_press()
else:
self.servo.enable_recovery_mode()
diff --git a/server/site_tests/firmware_ECAdc/firmware_ECAdc.py b/server/site_tests/firmware_ECAdc/firmware_ECAdc.py
index 67c3e8e..0ffe132 100644
--- a/server/site_tests/firmware_ECAdc/firmware_ECAdc.py
+++ b/server/site_tests/firmware_ECAdc/firmware_ECAdc.py
@@ -24,7 +24,7 @@
error.TestFail: Raised when read fails.
"""
try:
- t = int(self.send_uart_command_get_output("ectemp",
+ t = int(self.ec.send_command_get_output("ectemp",
["EC temperature is (\d+) K"])[0][1])
if t < 273 or t > 373:
raise error.TestFail("Abnormal EC temperature %d K" % t)
diff --git a/server/site_tests/firmware_ECBattery/firmware_ECBattery.py b/server/site_tests/firmware_ECBattery/firmware_ECBattery.py
index d5c71c5..f1f29c4 100644
--- a/server/site_tests/firmware_ECBattery/firmware_ECBattery.py
+++ b/server/site_tests/firmware_ECBattery/firmware_ECBattery.py
@@ -102,7 +102,7 @@
error.TestFail: Raised when battery tempearture is higher than
BATTERY_TEMP_UPPER_BOUND or lower than BATTERY_TEMP_LOWER_BOUND.
"""
- battery_temp = float(self.send_uart_command_get_output("battery",
+ battery_temp = float(self.ec.send_command_get_output("battery",
["Temp:.+\(([0-9\.]+) C\)"])[0][1])
logging.info("Battery temperature is %f C" % battery_temp)
if (battery_temp > self.BATTERY_TEMP_UPPER_BOUND or
diff --git a/server/site_tests/firmware_ECBootTime/firmware_ECBootTime.py b/server/site_tests/firmware_ECBootTime/firmware_ECBootTime.py
index 09ccbfe..f6599c5 100644
--- a/server/site_tests/firmware_ECBootTime/firmware_ECBootTime.py
+++ b/server/site_tests/firmware_ECBootTime/firmware_ECBootTime.py
@@ -23,9 +23,9 @@
boot_msg = ("([0-9\.]+) Port 80"
if self._x86 else "([0-9\.]+) AP running")
power_cmd = "powerbtn" if self._x86 else "power on"
- reboot = self.send_uart_command_get_output("reboot ap-off",
+ reboot = self.ec.send_command_get_output("reboot ap-off",
["([0-9\.]+) Inits done"])
- power_press = self.send_uart_command_get_output(power_cmd,
+ power_press = self.ec.send_command_get_output(power_cmd,
["\[([0-9\.]+) PB", boot_msg], timeout=3)
reboot_time = float(reboot[0][1])
power_press_time = float(power_press[0][1])
diff --git a/server/site_tests/firmware_ECCharging/firmware_ECCharging.py b/server/site_tests/firmware_ECCharging/firmware_ECCharging.py
index 830719e..37808a7 100644
--- a/server/site_tests/firmware_ECCharging/firmware_ECCharging.py
+++ b/server/site_tests/firmware_ECCharging/firmware_ECCharging.py
@@ -24,7 +24,7 @@
def _get_battery_desired_voltage(self):
"""Get battery desired voltage value."""
- voltage = int(self.send_uart_command_get_output("battery",
+ voltage = int(self.ec.send_command_get_output("battery",
["V-desired:\s+0x[0-9a-f]*\s+=\s+(\d+)\s+mV"])[0][1])
logging.info("Battery desired voltage = %d mV" % voltage)
return voltage
@@ -32,7 +32,7 @@
def _get_battery_desired_current(self):
"""Get battery desired current value."""
- current = int(self.send_uart_command_get_output("battery",
+ current = int(self.ec.send_command_get_output("battery",
["I-desired:\s+0x[0-9a-f]*\s+=\s+(\d+)\s+mA"])[0][1])
logging.info("Battery desired current = %d mA" % current)
return current
@@ -40,7 +40,7 @@
def _get_battery_actual_voltage(self):
"""Get the actual voltage from charger to battery."""
- voltage = int(self.send_uart_command_get_output("battery",
+ voltage = int(self.ec.send_command_get_output("battery",
["V:\s+0x[0-9a-f]*\s+=\s+(\d+)\s+mV"])[0][1])
logging.info("Battery actual voltage = %d mV" % voltage)
return voltage
@@ -48,7 +48,7 @@
def _get_battery_actual_current(self):
"""Get the actual current from charger to battery."""
- current = int(self.send_uart_command_get_output("battery",
+ current = int(self.ec.send_command_get_output("battery",
["I:\s+0x[0-9a-f]*\s+=\s+([0-9-]+)\s+mA"])[0][1])
logging.info("Battery actual current = %d mA" % current)
return current
@@ -56,7 +56,7 @@
def _get_battery_charge(self):
"""Get battery charge state."""
- charge = int(self.send_uart_command_get_output("battery",
+ charge = int(self.ec.send_command_get_output("battery",
["Charge:\s+(\d+)\s+"])[0][1])
logging.info("Battery charge = %d %%" % charge)
return charge
@@ -64,7 +64,7 @@
def _get_charger_target_voltage(self):
"""Get target charging voltage set in charger."""
- voltage = int(self.send_uart_command_get_output("charger",
+ voltage = int(self.ec.send_command_get_output("charger",
["V_batt:\s+(\d+)\s"])[0][1])
logging.info("Charger target voltage = %d mV" % voltage)
return voltage
@@ -72,7 +72,7 @@
def _get_charger_target_current(self):
"""Get target charging current set in charger."""
- current = int(self.send_uart_command_get_output("charger",
+ current = int(self.ec.send_command_get_output("charger",
["I_batt:\s+(\d+)\s"])[0][1])
logging.info("Charger target current = %d mA" % current)
return current
diff --git a/server/site_tests/firmware_ECKeyboard/firmware_ECKeyboard.py b/server/site_tests/firmware_ECKeyboard/firmware_ECKeyboard.py
index 27bd09b..c768f4b 100644
--- a/server/site_tests/firmware_ECKeyboard/firmware_ECKeyboard.py
+++ b/server/site_tests/firmware_ECKeyboard/firmware_ECKeyboard.py
@@ -48,13 +48,13 @@
def key_down(self, keyname):
"""Simulate pressing a key."""
- self.send_uart_command('kbpress %d %d 1' %
+ self.ec.send_command('kbpress %d %d 1' %
(KEYMATRIX[keyname][1], KEYMATRIX[keyname][0]))
def key_up(self, keyname):
"""Simulate releasing a key."""
- self.send_uart_command('kbpress %d %d 0' %
+ self.ec.send_command('kbpress %d %d 0' %
(KEYMATRIX[keyname][1], KEYMATRIX[keyname][0]))
diff --git a/server/site_tests/firmware_ECPeci/firmware_ECPeci.py b/server/site_tests/firmware_ECPeci/firmware_ECPeci.py
index 41bc51f..4c911ec 100644
--- a/server/site_tests/firmware_ECPeci/firmware_ECPeci.py
+++ b/server/site_tests/firmware_ECPeci/firmware_ECPeci.py
@@ -24,7 +24,7 @@
error.TestFail: Raised when read fails.
"""
try:
- t = int(self.send_uart_command_get_output("pecitemp",
+ t = int(self.ec.send_command_get_output("pecitemp",
["CPU temp = (\d+) K"])[0][1])
if t < 273 or t > 400:
raise error.TestFail("Abnormal CPU temperature %d K" % t)
diff --git a/server/site_tests/firmware_ECPowerG3/firmware_ECPowerG3.py b/server/site_tests/firmware_ECPowerG3/firmware_ECPowerG3.py
index 3dc7e8b..73c1b75 100644
--- a/server/site_tests/firmware_ECPowerG3/firmware_ECPowerG3.py
+++ b/server/site_tests/firmware_ECPowerG3/firmware_ECPowerG3.py
@@ -42,9 +42,9 @@
while timeout > 0:
try:
timeout = timeout - 1
- self.send_uart_command_get_output("powerinfo",
- [reg_ex],
- timeout=1)
+ self.ec.send_command_get_output("powerinfo",
+ [reg_ex],
+ timeout=1)
return True
except:
pass
diff --git a/server/site_tests/firmware_ECSharedMem/firmware_ECSharedMem.py b/server/site_tests/firmware_ECSharedMem/firmware_ECSharedMem.py
index 54cd9c8..0d4d372 100644
--- a/server/site_tests/firmware_ECSharedMem/firmware_ECSharedMem.py
+++ b/server/site_tests/firmware_ECSharedMem/firmware_ECSharedMem.py
@@ -25,8 +25,8 @@
def shared_mem_checker(self):
- match = self.send_uart_command_get_output("shmem",
- ["Size:\s+([0-9-]+)\r"])[0]
+ match = self.ec.send_command_get_output("shmem",
+ ["Size:\s+([0-9-]+)\r"])[0]
shmem_size = int(match[1])
logging.info("EC shared memory size if %d bytes", shmem_size)
if shmem_size <= 0:
@@ -37,7 +37,7 @@
def jump_checker(self):
- self.send_uart_command("sysjump RW")
+ self.ec.send_command("sysjump RW")
time.sleep(self.EC_BOOT_DELAY)
return self.shared_mem_checker()
@@ -48,7 +48,7 @@
self.register_faft_sequence((
{ # Step 1, check shared memory in normal operation and crash EC
'state_checker': self.shared_mem_checker,
- 'reboot_action': (self.send_uart_command, "crash unaligned")
+ 'reboot_action': (self.ec.send_command, "crash unaligned")
},
{ # Step 2, Check shared memory after crash and system jump
'state_checker': (lambda: self.shared_mem_checker() and
diff --git a/server/site_tests/firmware_ECThermal/firmware_ECThermal.py b/server/site_tests/firmware_ECThermal/firmware_ECThermal.py
index 7b72d44..d6f27aa 100644
--- a/server/site_tests/firmware_ECThermal/firmware_ECThermal.py
+++ b/server/site_tests/firmware_ECThermal/firmware_ECThermal.py
@@ -96,7 +96,7 @@
self._fan_steps = list()
expected_pat = (["Lowest speed: ([0-9-]+) RPM"] +
["\d+ K:\s+([0-9-]+) RPM"] * num_steps)
- match = self.send_uart_command_get_output("thermalfan 0", expected_pat)
+ match = self.ec.send_command_get_output("thermalfan 0", expected_pat)
for m in match:
self._fan_steps.append(int(m[1]))
diff --git a/server/site_tests/firmware_ECUsbPorts/firmware_ECUsbPorts.py b/server/site_tests/firmware_ECUsbPorts/firmware_ECUsbPorts.py
index 1f84dbc..44fb25a 100644
--- a/server/site_tests/firmware_ECUsbPorts/firmware_ECUsbPorts.py
+++ b/server/site_tests/firmware_ECUsbPorts/firmware_ECUsbPorts.py
@@ -58,7 +58,7 @@
while limit > 0:
try:
gpio_name = "USB%d_ENABLE" % (cnt + 1)
- self.send_uart_command_get_output(
+ self.ec.send_command_get_output(
"gpioget %s" % gpio_name,
["[01].\s*%s" % gpio_name],
timeout=1)
@@ -87,7 +87,7 @@
timeout = timeout - 1
for idx in xrange(1, port_count+1):
gpio_name = "USB%d_ENABLE" % idx
- self.send_uart_command_get_output(
+ self.ec.send_command_get_output(
"gpioget %s" % gpio_name,
["0.\s*%s" % gpio_name],
timeout=1)
diff --git a/server/site_tests/firmware_ECWakeSource/firmware_ECWakeSource.py b/server/site_tests/firmware_ECWakeSource/firmware_ECWakeSource.py
index 37905dc..d368b30 100644
--- a/server/site_tests/firmware_ECWakeSource/firmware_ECWakeSource.py
+++ b/server/site_tests/firmware_ECWakeSource/firmware_ECWakeSource.py
@@ -77,7 +77,7 @@
"""Shutdown and hibernate EC. Then wake by power button."""
self.faft_client.run_shell_command("shutdown -P now")
time.sleep(self.SHUTDOWN_DELAY)
- self.send_uart_command("hibernate 1000")
+ self.ec.send_command("hibernate 1000")
time.sleep(self.WAKE_DELAY)
self.servo.power_short_press()
diff --git a/server/site_tests/firmware_ECWatchdog/firmware_ECWatchdog.py b/server/site_tests/firmware_ECWatchdog/firmware_ECWatchdog.py
index 9e28265..b46533d 100644
--- a/server/site_tests/firmware_ECWatchdog/firmware_ECWatchdog.py
+++ b/server/site_tests/firmware_ECWatchdog/firmware_ECWatchdog.py
@@ -31,7 +31,7 @@
Trigger a watchdog reset.
"""
self.faft_client.run_shell_command("sync")
- self.send_uart_command("waitms %d" % self.WATCHDOG_DELAY)
+ self.ec.send_command("waitms %d" % self.WATCHDOG_DELAY)
time.sleep((self.WATCHDOG_DELAY + self.EC_BOOT_DELAY) / 1000.0)
self.check_lid_and_power_on()
diff --git a/server/site_tests/firmware_ECWriteProtect/firmware_ECWriteProtect.py b/server/site_tests/firmware_ECWriteProtect/firmware_ECWriteProtect.py
index 9ec010b..25581c6 100644
--- a/server/site_tests/firmware_ECWriteProtect/firmware_ECWriteProtect.py
+++ b/server/site_tests/firmware_ECWriteProtect/firmware_ECWriteProtect.py
@@ -33,7 +33,7 @@
- all_now
"""
try:
- self.send_uart_command_get_output("flashinfo",
+ self.ec.send_command_get_output("flashinfo",
["Flags:\s+wp_gpio_asserted\s+ro_at_boot\s+ro_now\s+all_now"],
timeout=0.1)
return True
diff --git a/server/site_tests/firmware_FAFTSetup/firmware_FAFTSetup.py b/server/site_tests/firmware_FAFTSetup/firmware_FAFTSetup.py
index 7cf0820..fd782b3 100644
--- a/server/site_tests/firmware_FAFTSetup/firmware_FAFTSetup.py
+++ b/server/site_tests/firmware_FAFTSetup/firmware_FAFTSetup.py
@@ -50,9 +50,9 @@
"RO:\s+[^\r\n]*\r\n",
"RW:\s+[^\r\n]*\r\n",
"Build:\s+[^\r\n]*\r\n"]
- self.send_uart_command_get_output("version",
- expected_output,
- timeout=0.2)
+ self.ec.send_command_get_output("version",
+ expected_output,
+ timeout=0.2)
return True
except:
logging.error("Cannot talk to EC console.")