blob: d7bbc8974068206308d4945956ac011dccf7d987 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2019 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from acts import base_test
from acts import asserts
from unittest import mock
import socket
import time
# TODO(markdr): Remove this hack after adding zeep to setup.py.
import sys
sys.modules['zeep'] = mock.Mock()
from acts.controllers.rohdeschwarz_lib import contest
class ContestTest(base_test.BaseTestClass):
""" Unit tests for the contest controller."""
LOCAL_HOST_IP = '127.0.0.1'
def test_automation_server_end_to_end(self):
""" End to end test for the Contest object's ability to start an
Automation Server and respond to the commands sent through the
socket interface. """
automation_port = 5555
# Instantiate the mock Contest object. This will start a thread in the
# background running the Automation server.
with mock.patch('zeep.client.Client') as zeep_client:
# Create a MagicMock instance
zeep_client.return_value = mock.MagicMock()
controller = contest.Contest(
logger=self.log,
remote_ip=None,
remote_port=None,
automation_listen_ip=self.LOCAL_HOST_IP,
automation_port=automation_port,
dut_on_func=None,
dut_off_func=None,
ftp_pwd=None,
ftp_usr=None)
# Give some time for the server to initialize as it's running on
# a different thread.
time.sleep(0.01)
# Start a socket connection and send a command
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((self.LOCAL_HOST_IP, automation_port))
s.sendall(b'AtTestcaseStart')
data = s.recv(1024)
asserts.assert_true(data == b'OK\n', "Received OK response.")
controller.destroy()
def test_automation_protocol_calls_dut_off_func_for_on_command(self):
""" Tests the AutomationProtocol's ability to turn the DUT off
upon receiving the requests."""
dut_on_func = mock.Mock()
protocol = contest.AutomationServer.AutomationProtocol(
mock.Mock(), dut_on_func, mock.Mock())
protocol.send_ok = mock.Mock()
protocol.data_received(b'DUT_SWITCH_ON')
asserts.assert_true(dut_on_func.called, 'Function was not called.')
asserts.assert_true(protocol.send_ok.called, 'OK response not sent.')
def test_automation_protocol_calls_dut_on_func_for_off_command(self):
""" Tests the Automation server's ability to turn the DUT on
upon receiving the requests."""
dut_off_func = mock.Mock()
protocol = contest.AutomationServer.AutomationProtocol(
mock.Mock(), mock.Mock(), dut_off_func)
protocol.send_ok = mock.Mock()
protocol.data_received(b'DUT_SWITCH_OFF')
asserts.assert_true(dut_off_func.called, 'Function was not called.')
asserts.assert_true(protocol.send_ok.called, 'OK response not sent.')
def test_automation_protocol_handles_testcase_start_command(self):
""" Tests the Automation server's ability to handle a testcase start
command."""
protocol = contest.AutomationServer.AutomationProtocol(
mock.Mock(), mock.Mock(), None)
protocol.send_ok = mock.Mock()
protocol.data_received(b'AtTestcaseStart name_of_the_testcase')
asserts.assert_true(protocol.send_ok.called, 'OK response not sent.')
def test_automation_protocol_handles_testplan_start_command(self):
""" Tests the Automation server's ability to handle a testplan start
command."""
protocol = contest.AutomationServer.AutomationProtocol(
mock.Mock(), mock.Mock(), None)
protocol.send_ok = mock.Mock()
protocol.data_received(b'AtTestplanStart')
asserts.assert_true(protocol.send_ok.called, 'OK response not sent.')
def test_automation_protocol_handles_testcase_end_command(self):
""" Tests the Automation server's ability to handle a testcase end
command."""
protocol = contest.AutomationServer.AutomationProtocol(
mock.Mock(), mock.Mock(), None)
protocol.send_ok = mock.Mock()
protocol.data_received(b'AfterTestcase')
asserts.assert_true(protocol.send_ok.called, 'OK response not sent.')
def test_automation_protocol_handles_testplan_end_command(self):
""" Tests the Automation server's ability to handle a testplan start
command."""
protocol = contest.AutomationServer.AutomationProtocol(
mock.Mock(), mock.Mock(), None)
protocol.send_ok = mock.Mock()
protocol.data_received(b'AfterTestplan')
asserts.assert_true(protocol.send_ok.called, 'OK response not sent.')
# Makes all time.sleep commands call a mock function that returns
# immediately, rather than sleeping.
@mock.patch('time.sleep')
# Prevents the controller to try to download the results from the FTP server
@mock.patch('acts.controllers.gnssinst_lib.rohdeschwarz.contest'
'.Contest.pull_test_results')
def test_execute_testplan_stops_reading_output_on_exit_line(
self, time_mock, results_func_mock):
""" Makes sure that execute_test plan returns after receiving an
exit code.
Args:
time_mock: time.sleep mock object.
results_func_mock: Contest.pull_test_results mock object.
"""
service_output = mock.Mock()
# An array of what return values. If a value is an Exception, the
# Exception is raised instead.
service_output.side_effect = [
'Output line 1\n', 'Output line 2\n',
'Testplan Directory: \\\\a\\b\\c\n'
'Exit code: 0\n',
AssertionError('Tried to read output after exit code was sent.')
]
with mock.patch('zeep.client.Client') as zeep_client:
zeep_client.return_value.service.DoGetOutput = service_output
controller = contest.Contest(
logger=self.log,
remote_ip=None,
remote_port=None,
automation_listen_ip=None,
automation_port=None,
dut_on_func=None,
dut_off_func=None,
ftp_usr=None,
ftp_pwd=None)
controller.execute_testplan('TestPlan')
controller.destroy()
# Makes all time.sleep commands call a mock function that returns
# immediately, rather than sleeping.
@mock.patch('time.sleep')
# Prevents the controller to try to download the results from the FTP server
@mock.patch.object(contest.Contest, 'pull_test_results')
def test_execute_testplan_detects_results_directory(
self, time_mock, results_func_mock):
""" Makes sure that execute_test is able to detect the testplan
directory from the test output.
Args:
time_mock: time.sleep mock object.
results_func_mock: Contest.pull_test_results mock object.
"""
results_directory = 'results\directory\\name'
service_output = mock.Mock()
# An array of what return values. If a value is an Exception, the
# Exception is raised instead.
service_output.side_effect = [
'Testplan Directory: {}{}\\ \n'.format(
contest.Contest.FTP_ROOT, results_directory), 'Exit code: 0\n'
]
with mock.patch('zeep.client.Client') as zeep_client:
zeep_client.return_value.service.DoGetOutput = service_output
controller = contest.Contest(
logger=self.log,
remote_ip=None,
remote_port=None,
automation_listen_ip=None,
automation_port=None,
dut_on_func=None,
dut_off_func=None,
ftp_usr=None,
ftp_pwd=None)
controller.execute_testplan('TestPlan')
controller.pull_test_results.assert_called_with(results_directory)
controller.destroy()
# Makes all time.sleep commands call a mock function that returns
# immediately, rather than sleeping.
@mock.patch('time.sleep')
# Prevents the controller to try to download the results from the FTP server
@mock.patch.object(contest.Contest, 'pull_test_results')
def test_execute_testplan_fails_when_contest_is_unresponsive(
self, time_mock, results_func_mock):
""" Makes sure that execute_test plan returns after receiving an
exit code.
Args:
time_mock: time.sleep mock object.
results_func_mock: Contest.pull_test_results mock object.
"""
service_output = mock.Mock()
# An array of what return values. If a value is an Exception, the
# Exception is raised instead.
mock_output = [None] * contest.Contest.MAXIMUM_OUTPUT_READ_RETRIES
mock_output.append(
AssertionError('Test did not failed after too many '
'unsuccessful retries.'))
service_output.side_effect = mock_output
with mock.patch('zeep.client.Client') as zeep_client:
zeep_client.return_value.service.DoGetOutput = service_output
controller = contest.Contest(
logger=self.log,
remote_ip=None,
remote_port=None,
automation_listen_ip=None,
automation_port=None,
dut_on_func=None,
dut_off_func=None,
ftp_usr=None,
ftp_pwd=None)
try:
controller.execute_testplan('TestPlan')
except RuntimeError:
pass
controller.destroy()