[autotest] Add servo repair process
This CL has the core logic of servo repair process.
A new type of host class, ServoHost, is created for interacting
with our beaglebones. With this class, we can verify and repair
a beaglebone.
CrosHost._initialize now checks whether a servo is required by
a test (see inline comments about how this is achieved).
If a servo is required and the servo is in our lab,
it will attempt to repair bad servo. It will raise an exception
if the servo couldn't be fixed.
CrosHost.repair_full now attempts to repair the servo as well.
If a servo can't be fixed, the repair job will fail leaving the
DUT in "Repair Failed" status.
BUG=chromium:245320
TEST=1)Manually make cros_host believe it is dealing with a lab servo,
and test with repair job, and platform_InstallTestImage.
Confirm when servo is broken, it goes through the servo repair flow.
2) Use local servo by setting servo_args with local servo ip.
Test with repair job and platform_InstallTestImage.
Confirm when servo is broken, it throws a exception.
3) Manually test ServoHost.run, Servo.system, Servo.system_output,
Servo, Servo._scp_image, ensure they work after refactoring.
Change-Id: I9abc3320ba84a604a6534b75d16156bda30b09b5
Reviewed-on: https://chromium-review.googlesource.com/66891
Reviewed-by: Richard Barnette <jrbarnette@chromium.org>
Commit-Queue: Fang Deng <fdeng@chromium.org>
Tested-by: Fang Deng <fdeng@chromium.org>
diff --git a/server/cros/servo/servo.py b/server/cros/servo/servo.py
index 07d1558..13d0cd6 100644
--- a/server/cros/servo/servo.py
+++ b/server/cros/servo/servo.py
@@ -10,7 +10,6 @@
import logging, re, time, xmlrpclib
from autotest_lib.client.common_lib import error
-from autotest_lib.client.common_lib.cros import retry
from autotest_lib.server import utils
from autotest_lib.server.cros.servo import power_state_controller
from autotest_lib.server.cros.servo import programmer
@@ -101,28 +100,19 @@
KEY_MATRIX = [KEY_MATRIX_ALT_0, KEY_MATRIX_ALT_1, KEY_MATRIX_ALT_2]
- def __init__(self, servo_host='localhost', servo_port=9999):
+ def __init__(self, servo_host):
"""Sets up the servo communication infrastructure.
- @param servo_host Name of the host where the servod process
- is running.
- @param servo_port Port the servod process is listening on.
+ @param servo_host: A ServoHost object representing
+ the host running servod.
"""
self._key_matrix = 0
- self._server = None
- self._connect_servod(servo_host, servo_port)
- self._is_localhost = (servo_host == 'localhost')
- # TODO(jrbarnette): As of this writing, not all beaglebone
- # servo hosts support the get_board() function. For now, we
- # treat the problem hosts as an unsupported board. The try
- # wrapper should be removed once all the hosts are updated.
- board = None
- try:
- board = self._server.get_board()
- except xmlrpclib.Fault as e:
- logging.error('Failed to create power state controller; '
- 'check hdctools version on %s.', servo_host)
- logging.exception(e)
+ # TODO(fdeng): crbug.com/298379
+ # We should move servo_host object out of servo object
+ # to minimize the dependencies on the rest of Autotest.
+ self._servo_host = servo_host
+ self._server = servo_host.get_servod_server_proxy()
+ board = self._server.get_board()
self._power_state = (
power_state_controller.create_controller(self, board))
@@ -133,22 +123,6 @@
self.set('usb_mux_oe1', 'on')
self.switch_usbkey('off')
- # Commands on the servo host must be run by the superuser. Our account
- # on Beaglebone is root, but locally we might be running as a
- # different user. If so - `sudo ' will have to be added to the
- # commands.
- if self._is_localhost:
- self._sudo_required = utils.system_output('id -u') != '0'
- self._ssh_prefix = ''
- else:
- common_options = ('-o BatchMode=yes '
- '-o StrictHostKeyChecking=no '
- '-o UserKnownHostsFile=/dev/null')
- self._sudo_required = False
- self._ssh_prefix = 'ssh -a -x %s root@%s ' % (
- common_options, servo_host)
- self._scp_cmd_template = 'scp -r %s ' % common_options
- self._scp_cmd_template += '%s ' + 'root@' + servo_host + ':%s'
def get_power_state_controller(self):
"""Return the power state controller for this Servo.
@@ -159,6 +133,7 @@
"""
return self._power_state
+
def initialize_dut(self, cold_reset=False):
"""Initializes a dut for testing purposes.
@@ -191,7 +166,7 @@
Returns:
True if local hosted; otherwise, False.
"""
- return self._is_localhost
+ return self._servo_host.is_localhost()
def power_long_press(self):
@@ -571,31 +546,6 @@
self.switch_usbkey('dut')
- def _connect_servod(self, servo_host, servo_port):
- """Connect to the Servod process with XMLRPC.
-
- Args:
- servo_port: Port the Servod process is listening on.
- """
- remote = 'http://%s:%s' % (servo_host, servo_port)
- self._server = xmlrpclib.ServerProxy(remote)
- try:
- # We have noticed incidents where we are able to connect but the
- # actual servo calls go on for 20-30 mins. Thus _connect_servod now
- # grabs the pwr_button state and if it times out raises an error.
- timeout, result = retry.timeout(
- self._server.get, args=('pwr_button', ),
- timeout_sec=Servo.INIT_TIMEOUT_SECS)
- if timeout:
- raise error.AutotestError('Timed out getting value for '
- 'pwr_button.')
- except Exception as e:
- # Make sure to catch any errors that can occur connecting to servod.
- error_msg = 'Connection to servod failed. Failure: %s' % e
- logging.error(error_msg)
- raise error.AutotestError(error_msg)
-
-
def _scp_image(self, image_path):
"""Copy image to the servo host.
@@ -610,19 +560,14 @@
"""
dest_path = os.path.join('/tmp', os.path.basename(image_path))
- scp_cmd = self._scp_cmd_template % (image_path, dest_path)
- utils.system(scp_cmd)
+ self._servo_host.send_file(image_path, dest_path)
return dest_path
def system(self, command, timeout=None):
"""Execute the passed in command on the servod host."""
- if self._sudo_required:
- command = 'sudo -n %s' % command
- if self._ssh_prefix:
- command = "%s '%s'" % (self._ssh_prefix, command)
logging.info('Will execute on servo host: %s', command)
- utils.system(command, timeout=timeout)
+ self._servo_host.run(command, timeout=timeout)
def system_output(self, command, timeout=None,
@@ -638,14 +583,11 @@
parameter for the command
@return: command's stdout as a string.
"""
- if self._sudo_required:
- command = 'sudo -n %s' % command
- if self._ssh_prefix:
- command = "%s '%s'" % (self._ssh_prefix, command)
logging.info('Will execute and collect output on servo host: %s %s',
command, ' '.join("'%s'" % x for x in args))
- return utils.system_output(command, timeout=timeout,
- ignore_status=ignore_status, args=args)
+ return self._servo_host.run(command, timeout=timeout,
+ ignore_status=ignore_status,
+ args=args).stdout.strip()
def program_ec(self, board, image):