[autotest] servo: capture CPU/EC UART.
This change adds logs for CPU/EC UART streams. Logging starts at test
start, and logs are saved when the test stops.
TEST=Ran command
test_that -b monroe -m monroe chromeos4-row13-rack12-host10 platform_LongPressPower
and verified that two files, i.e. cpu_uart.log and ec_uart.log were
generated in test results dir.
BUG=chromium:819882
Change-Id: Ia75b290153597e0c0ef2c0e098092b5bc059b59b
Reviewed-on: https://chromium-review.googlesource.com/1139213
Commit-Ready: Congbin Guo <guocb@chromium.org>
Tested-by: Congbin Guo <guocb@chromium.org>
Reviewed-by: Congbin Guo <guocb@chromium.org>
diff --git a/server/cros/servo/servo.py b/server/cros/servo/servo.py
index cdb86f3..377cc0f 100644
--- a/server/cros/servo/servo.py
+++ b/server/cros/servo/servo.py
@@ -5,9 +5,12 @@
# Expects to be run in an environment with sudo and no interactive password
# prompt, such as within the Chromium OS development chroot.
+import ast
+import logging
import os
-
-import logging, re, time, xmlrpclib
+import re
+import time
+import xmlrpclib
from autotest_lib.client.common_lib import error
from autotest_lib.server.cros.servo import firmware_programmer
@@ -109,6 +112,59 @@
self._servo.set_nocheck('power_state', rec_mode)
+class _Uart(object):
+ """Class to capture CPU/EC UART streams."""
+ def __init__(self, servo):
+ self._servo = servo
+ self._streams = []
+
+ def start_capture(self):
+ """Start capturing Uart streams."""
+ logging.debug('Start capturing CPU/EC UART.')
+ self._servo.set('cpu_uart_capture', 'on')
+ self._streams.append(('cpu_uart_stream', 'cpu_uart.log'))
+ try:
+ self._servo.set('ec_uart_capture', 'on')
+ self._streams.append(('ec_uart_stream', 'ec_uart.log'))
+ except error.TestFail as err:
+ if 'No control named' in str(err):
+ logging.debug('The servod is too old that ec_uart_capture not '
+ 'supported.')
+
+ def dump(self, output_dir):
+ """Dump UART streams to log files accordingly.
+
+ @param output_dir: A string of output directory name.
+ """
+ for stream, logfile in self._streams:
+ logfile_fullname = os.path.join(output_dir, logfile)
+ try:
+ content = self._servo.get(stream)
+ except Exception as err:
+ logging.warn('Failed to get UART log for %s: %s', stream, err)
+ continue
+
+ # The UART stream may contain non-printable characters, and servo
+ # returns it in string representation. We use `ast.leteral_eval`
+ # to revert it back.
+ with open(logfile_fullname, 'a') as fd:
+ fd.write(ast.literal_eval(content))
+
+ def stop_capture(self):
+ """Stop capturing UART streams."""
+ logging.debug('Stop capturing CPU/EC UART.')
+ for uart in ('cpu_uart_capture', 'ec_uart_capture'):
+ try:
+ self._servo.set(uart, 'off')
+ except error.TestFail as err:
+ if 'No control named' in str(err):
+ logging.debug('The servod is too old that %s not '
+ 'supported.', uart)
+ except Exception as err:
+ logging.warn('Failed to stop UART logging for %s: %s', uart,
+ err)
+
+
class Servo(object):
"""Manages control of a Servo board.
@@ -176,6 +232,7 @@
self._servo_serial = servo_serial
self._server = servo_host.get_servod_server_proxy()
self._power_state = _PowerStateController(self)
+ self._uart = _Uart(self)
self._usb_state = None
self._programmer = None
@@ -220,6 +277,7 @@
self.set('usb_mux_oe1', 'on')
self._usb_state = None
self.switch_usbkey('off')
+ self._uart.start_capture()
if cold_reset:
self._power_state.reset()
logging.debug('Servo initialized, version is %s',
@@ -851,3 +909,19 @@
else:
self._usb_state = 'host'
return self._usb_state
+
+
+ def dump_uart_streams(self, output_dir):
+ """Get buffered UART streams and append to log files.
+
+ @param output_dir: A string of directory name to save log files.
+ """
+ if self._uart:
+ self._uart.dump(output_dir)
+
+
+ def close(self):
+ """Close the servo object."""
+ if self._uart:
+ self._uart.stop_capture()
+ self._uart = None