Move USB probing logic and recovery image install to servo.py.
BUG=chrome-os-partner:381
TEST=Run manually
Change-Id: I2211b7a4b24fff122e3ff6357534a89b4ab0a3ff
Reviewed-on: https://gerrit.chromium.org/gerrit/12258
Reviewed-by: Jon Salz <jsalz@chromium.org>
Tested-by: Jon Salz <jsalz@chromium.org>
Commit-Ready: Jon Salz <jsalz@chromium.org>
diff --git a/server/cros/servo.py b/server/cros/servo.py
index 1b6d2a6..9919e5b 100644
--- a/server/cros/servo.py
+++ b/server/cros/servo.py
@@ -5,10 +5,9 @@
# Expects to be run in an environment with sudo and no interactive password
# prompt, such as within the Chromium OS development chroot.
-import logging
-import time
-import xmlrpclib
-import subprocess
+import logging, os, subprocess, time, xmlrpclib
+from autotest_lib.client.bin import utils as client_utils
+from autotest_lib.server import utils
class Servo:
"""Manages control of a Servo board.
@@ -34,12 +33,16 @@
# Delays to deal with computer transitions.
SLEEP_DELAY = 6
BOOT_DELAY = 10
- RECOVERY_INSTALL_DELAY = 300
+ RECOVERY_BOOT_DELAY = 30
+ RECOVERY_INSTALL_DELAY = 180
# Servo-specific delays.
MAX_SERVO_STARTUP_DELAY = 10
SERVO_SEND_SIGNAL_DELAY = 0.5
+ # Time between an usb disk plugged-in and detected in the system.
+ USB_DETECTION_DELAY = 10
+
def __init__(self, servo_host=None, servo_port=None,
xml_config=['servo.xml'], servo_vid=None, servo_pid=None,
servo_serial=None, cold_reset=False):
@@ -283,6 +286,111 @@
self._server.set(gpio_name, gpio_value)
+ # TODO(waihong) It may fail if multiple servo's are connected to the same
+ # host. Should look for a better way, like the USB serial name, to identify
+ # the USB device.
+ def probe_host_usb_dev(self):
+ """Probe the USB disk device plugged-in the servo from the host side.
+
+ It tries to switch the USB mux to make the host unable to see the
+ USB disk and compares the result difference.
+
+ This only works if the servo is attached to the local host.
+
+ Returns:
+ A string of USB disk path, like '/dev/sdb', or None if not existed.
+ """
+ cmd = 'ls /dev/sd[a-z]'
+ original_value = self.get('usb_mux_sel1')
+
+ # Make the host unable to see the USB disk.
+ if original_value != 'dut_sees_usbkey':
+ self.set('usb_mux_sel1', 'dut_sees_usbkey')
+ time.sleep(self.USB_DETECTION_DELAY)
+ no_usb_set = set(utils.system_output(cmd, ignore_status=True).split())
+
+ # Make the host able to see the USB disk.
+ self.set('usb_mux_sel1', 'servo_sees_usbkey')
+ time.sleep(self.USB_DETECTION_DELAY)
+ has_usb_set = set(utils.system_output(cmd, ignore_status=True).split())
+
+ # Back to its original value.
+ if original_value != 'servo_sees_usbkey':
+ self.set('usb_mux_sel1', original_value)
+ time.sleep(self.USB_DETECTION_DELAY)
+
+ diff_set = has_usb_set - no_usb_set
+ if len(diff_set) == 1:
+ return diff_set.pop()
+ else:
+ return None
+
+
+ def install_recovery_image(self, image_path=None, usb_mount_point=None,
+ wait_for_completion=True):
+ """Install the recovery image specied by the path onto the DUT.
+
+ This method uses google recovery mode to install a recovery image
+ onto a DUT through the use of a USB stick that is mounted on a servo
+ board specified by the usb_mount_point. If no image path is specified
+ we use the recovery image already on the usb image.
+
+ Args:
+ image_path: Path on the host to the recovery image.
+ usb_mount_point: When servo_sees_usbkey is enabled, which dev
+ e.g. /dev/sdb will the usb key show up as.
+ wait_for_completion: Whether to wait for completion of the
+ factory install and disable the USB hub
+ before returning. Currently this is just
+ a hardcoded wait of RECOVERY_INSTALL_DELAY
+ (for the recovery process to complete).
+ """
+ # Set up Servo's usb mux.
+ self.set('prtctl4_pwren', 'on')
+ self.enable_usb_hub(host=True)
+ if image_path and usb_mount_point:
+ logging.info('Installing recovery image onto usb stick. '
+ 'This takes a while ...')
+ client_utils.poll_for_condition(
+ lambda: os.path.exists(usb_mount_point),
+ timeout=Servo.USB_DETECTION_DELAY,
+ desc="%s exists" % usb_mount_point)
+ utils.system(' '.join(
+ ['sudo', 'dd', 'if=%s' % image_path,
+ 'of=%s' % usb_mount_point, 'bs=4M']))
+
+ # Turn the device off.
+ self.power_normal_press()
+ time.sleep(Servo.SLEEP_DELAY)
+
+ # Boot in recovery mode.
+ try:
+ self.set('usb_mux_sel1', 'dut_sees_usbkey')
+ self.enable_recovery_mode()
+ self.power_normal_press()
+ time.sleep(Servo.RECOVERY_BOOT_DELAY)
+ self.disable_recovery_mode()
+
+ if wait_for_completion:
+ # Enable recovery installation.
+ logging.info('Running the recovery process on the DUT. '
+ 'Waiting %d seconds for recovery to complete ...',
+ Servo.RECOVERY_INSTALL_DELAY)
+ time.sleep(Servo.RECOVERY_INSTALL_DELAY)
+
+ # Go back into normal mode and reboot.
+ # Machine automatically reboots after the usb key is removed.
+ logging.info('Removing the usb key from the DUT.')
+ self.disable_usb_hub()
+ time.sleep(Servo.BOOT_DELAY)
+ except:
+ # In case anything went wrong we want to make sure to do a clean
+ # reset.
+ self.disable_recovery_mode()
+ self.warm_reset()
+ raise
+
+
def _init_seq_cold_reset_devmode(self):
"""Cold reset, init device, and boot in dev-mode."""
self.cold_reset()