blob: 0c6f29843e327bf9b39a75c13d23c56dd811179b [file] [log] [blame]
# Copyright 2019 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import argparse
import logging
import os
import serial
import subprocess
import sys
import traceback
import coloredlogs
from serial.tools import list_ports
_LOG = logging.getLogger('unit_test_runner')
# Verification of test pass/failure depends on these strings. If the formatting
# or output of the simple_printing_event_handler changes, this may need to be
# updated.
_TESTS_STARTING_STRING = b'[==========] Running all tests.'
_TESTS_DONE_STRING = b'[==========] Done running all tests.'
_TEST_FAILURE_STRING = b'[ FAILED ]'
class TestingFailure(Exception):
"""A simple exception to be raised when a testing step fails."""
def parse_args():
"""Parses command-line arguments."""
parser = argparse.ArgumentParser(
'Flashes and then runs on-device unit tests')
parser.add_argument('binary', help='The target test binary to run')
parser.add_argument('--openocd-config',
required=True,
help='Path to openocd configuration file')
parser.add_argument('--port',
required=True,
help='The name of the serial port to connect to when '
'running tests')
parser.add_argument('--baud',
type=int,
default=115200,
help='Target baud rate to use for serial communication'
' with target device')
parser.add_argument('--test-timeout',
type=float,
default=2.0,
help='Maximum communication delay in seconds before a '
'test is considered unresponsive and aborted')
parser.add_argument('--verbose',
'-v',
dest='verbose',
action="store_true",
help='Output additional logs as the script runs')
return parser.parse_args()
def reset_device(openocd_config):
"""Uses openocd to reset the attached device."""
# Name/path of openocd.
default_flasher = 'openocd'
flash_tool = os.getenv('OPENOCD_PATH', default_flasher)
cmd = [
flash_tool, '-f', openocd_config, '-c', 'init', '-c', 'reset run',
'-c', 'exit'
]
_LOG.info('Resetting device...')
process = subprocess.run(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
if process.returncode:
_LOG.error(f'\n{process.stdout.decode("utf-8", errors="replace")}')
raise TestingFailure('Failed to reset target device.')
else:
_LOG.debug(f'\n{process.stdout.decode("utf-8", errors="replace")}')
_LOG.info('Successfully reset device!')
def read_serial(openocd_config, port, baud_rate, test_timeout) -> bytes:
"""Reads lines from a serial port until a line read times out.
Returns bytes object containing the read serial data.
"""
serial_data = bytearray()
device = serial.Serial(baudrate=baud_rate, port=port, timeout=test_timeout)
if not device.is_open:
raise TestingFailure('Failed to open device.')
# Flush input buffer and reset the device to begin the test.
device.reset_input_buffer()
reset_device(openocd_config)
# Block and wait for the first byte.
serial_data += device.read()
if not serial_data:
raise TestingFailure('Device not producing output. :(')
# Read with a reasonable timeout until we stop getting characters.
while True:
bytes_read = device.readline()
if not bytes_read:
break
serial_data += bytes_read
if serial_data.rfind(_TESTS_DONE_STRING) != -1:
# Set to much more aggressive timeout since the last one or two
# lines should print out immediately. (one line if all fails or all
# passes, two lines if mixed.)
device.timeout = 0.01
# Try to trim captured results to only contain most recent test run.
test_start_index = serial_data.rfind(_TESTS_STARTING_STRING)
return serial_data if test_start_index == -1 else serial_data[
test_start_index:]
def flash_device(binary, openocd_config):
"""Flash binary to a connected device using the provided configuration."""
# Name/path of openocd.
default_flasher = 'openocd'
flash_tool = os.getenv('OPENOCD_PATH', default_flasher)
openocd_command = ' '.join(['program', binary, 'reset', 'exit'])
cmd = [flash_tool, '-f', openocd_config, '-c', openocd_command]
_LOG.info('Flashing firmware to device...')
process = subprocess.run(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
if process.returncode:
_LOG.error(f'\n{process.stdout.decode("utf-8", errors="replace")}')
raise TestingFailure('Failed to flash target device.')
else:
_LOG.debug(f'\n{process.stdout.decode("utf-8", errors="replace")}')
_LOG.info('Successfully flashed firmware to device!')
def handle_test_results(test_output):
"""Parses test output to determine whether tests passed or failed."""
if test_output.find(_TESTS_STARTING_STRING) == -1:
raise TestingFailure('Failed to find test start.')
if test_output.rfind(_TESTS_DONE_STRING) == -1:
_LOG.info(f'\n{test_output.decode("utf-8", errors="replace")}')
raise TestingFailure('Tests did not complete.')
if test_output.rfind(_TEST_FAILURE_STRING) != -1:
_LOG.info(f'\n{test_output.decode("utf-8", errors="replace")}')
raise TestingFailure('Test suite had one or more failures.')
_LOG.debug(f'\n{test_output.decode("utf-8", errors="replace")}')
_LOG.info('Test passed!')
def run_device_test(binary, test_timeout, openocd_config, baud,
port=None) -> bool:
"""Flashes, runs, and checks an on-device test binary.
Returns true on test pass.
"""
_LOG.info('Launching test binary {}'.format(binary))
try:
flash_device(binary, openocd_config)
serial_data = read_serial(openocd_config, port, baud, test_timeout)
handle_test_results(serial_data)
except TestingFailure as err:
traceback.print_tb(err.__traceback__)
_LOG.error(err)
return False
return True
def main(args=None):
"""Set up runner, and then flash/run device test."""
args = parse_args()
coloredlogs.install(level='DEBUG' if args.verbose else 'INFO',
level_styles={
'debug': {
'color': 244
},
'error': {
'color': 'red'
}
},
fmt='TEST - %(asctime)s - %(levelname)s - %(message)s')
if run_device_test(args.binary, args.test_timeout, args.openocd_config,
args.baud, args.port):
sys.exit(0)
else:
sys.exit(1)
if __name__ == '__main__':
main()