Add UART communication interface to FAFT sequence
In some cases, we want to be able to communicate through UART. Let's add
a simple interface to do so.
BUG=chrome-os-partner:10142
TEST=Use this interface to control fan speed and query fan status.
Change-Id: Ibb645dcf0d00b9e23f6ca1fa141a70876878ad48
Reviewed-on: https://gerrit.chromium.org/gerrit/25380
Reviewed-by: Tom Wai-Hong Tam <waihong@chromium.org>
Commit-Ready: Vic Yang <victoryang@chromium.org>
Tested-by: Vic Yang <victoryang@chromium.org>
diff --git a/server/cros/faftsequence.py b/server/cros/faftsequence.py
index f7eddeb..a940a16 100644
--- a/server/cros/faftsequence.py
+++ b/server/cros/faftsequence.py
@@ -2,8 +2,10 @@
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
+import fdpexpect
import logging
import os
+import pexpect
import re
import sys
import tempfile
@@ -303,6 +305,99 @@
})
+ def _open_uart_pty(self):
+ """Open UART pty and spawn pexpect object.
+
+ Returns:
+ Tuple (fd, child): fd is the file descriptor of opened UART pty, and
+ child is a fdpexpect object tied to it.
+ """
+ fd = os.open(self.servo.get("uart1_pty"), os.O_RDWR | os.O_NONBLOCK)
+ child = fdpexpect.fdspawn(fd)
+ return (fd, child)
+
+
+ def _flush_uart_pty(self, child):
+ """Flush UART output to prevent previous pending message interferring.
+
+ Args:
+ child: The fdpexpect object tied to UART pty.
+ """
+ child.sendline("")
+ while True:
+ try:
+ child.expect(".", timeout=0.01)
+ except pexpect.TIMEOUT:
+ break
+
+
+ def _uart_send(self, child, line):
+ """Flush and send command through UART.
+
+ Args:
+ child: The pexpect object tied to UART pty.
+ line: String to send through UART.
+
+ Raises:
+ error.TestFail: Raised when writing to UART fails.
+ """
+ logging.info("Sending UART command: %s" % line)
+ self._flush_uart_pty(child)
+ if child.sendline(line) != len(line) + 1:
+ raise error.TestFail("Failed to send UART command.")
+
+
+ def send_uart_command(self, command):
+ """Send command through UART.
+
+ This function open UART pty when called, and then command is sent
+ through UART.
+
+ Args:
+ command: The command string to send.
+
+ Raises:
+ error.TestFail: Raised when writing to UART fails.
+ """
+ (fd, child) = self._open_uart_pty()
+ try:
+ self._uart_send(child, command)
+ finally:
+ os.close(fd)
+
+
+ def send_uart_command_get_output(self, command, regex_list, timeout=1):
+ """Send command through UART and wait for response.
+
+ This function waits for response message matching regular expressions.
+
+ Args:
+ command: The command sent.
+ regex_list: List of regular expressions used to match response message.
+ Note, list must be ordered.
+
+ Returns:
+ List of match objects of response message.
+
+ Raises:
+ error.TestFail: If timed out waiting for EC response.
+ """
+ if not isinstance(regex_list, list):
+ regex_list = [regex_list]
+ result_list = []
+ (fd, child) = self._open_uart_pty()
+ try:
+ self._uart_send(child, command)
+ for regex in regex_list:
+ child.expect(regex, timeout=timeout)
+ result_list.append(child.match)
+ except pexpect.TIMEOUT:
+ raise error.TestFail("Timeout waiting for UART response.")
+ finally:
+ os.close(fd)
+ return result_list
+
+
def _parse_crossystem_output(self, lines):
"""Parse the crossystem output into a dict.