blob: 8926409eaa5141260bdb1033fd65571cb550e235 [file] [log] [blame]
#!/usr/bin/env python3.4
#
# Copyright 2018 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
import sys
import random
import time
import acts.controllers.packet_capture as packet_capture
import acts.signals as signals
import acts.test_utils.wifi.rpm_controller_utils as rutils
import acts.test_utils.wifi.wifi_datastore_utils as dutils
import acts.test_utils.wifi.wifi_test_utils as wutils
from acts import asserts
from acts.base_test import BaseTestClass
from acts.controllers.ap_lib import hostapd_constants
from acts.test_decorators import test_tracker_info
from acts.test_utils.wifi.WifiBaseTest import WifiBaseTest
WifiEnums = wutils.WifiEnums
WAIT_BEFORE_CONNECTION = 1
SINGLE_BAND = 1
DUAL_BAND = 2
TIMEOUT = 60
TEST = 'test_'
PING_ADDR = 'www.google.com'
NUM_LINK_PROBES = 3
PROBE_DELAY_SEC = 1
class WifiChaosTest(WifiBaseTest):
""" Tests for wifi IOT
Test Bed Requirement:
* One Android device
* Wi-Fi IOT networks visible to the device
"""
def __init__(self, configs):
BaseTestClass.__init__(self, configs)
self.generate_interop_tests()
def randomize_testcases(self):
"""Randomize the list of hosts and build a random order of tests,
based on SSIDs, keeping all the relevant bands of an AP together.
"""
temp_tests = list()
hosts = self.user_params['interop_host']
random.shuffle(hosts)
for host in hosts:
ssid_2g = None
ssid_5g = None
info = dutils.show_device(host)
# Based on the information in datastore, add to test list if
# AP has 2G band.
if 'ssid_2g' in info:
ssid_2g = info['ssid_2g']
temp_tests.append(TEST + ssid_2g)
# Based on the information in datastore, add to test list if
# AP has 5G band.
if 'ssid_5g' in info:
ssid_5g = info['ssid_5g']
temp_tests.append(TEST + ssid_5g)
self.tests = temp_tests
def generate_interop_testcase(self, base_test, testcase_name, ssid_dict):
"""Generates a single test case from the given data.
Args:
base_test: The base test case function to run.
testcase_name: The name of the test case.
ssid_dict: The information about the network under test.
"""
ssid = testcase_name
test_tracker_uuid = ssid_dict[testcase_name]['uuid']
hostname = ssid_dict[testcase_name]['host']
if not testcase_name.startswith('test_'):
testcase_name = 'test_%s' % testcase_name
test_case = test_tracker_info(uuid=test_tracker_uuid)(
lambda: base_test(ssid, hostname))
setattr(self, testcase_name, test_case)
self.tests.append(testcase_name)
def generate_interop_tests(self):
for ssid_dict in self.user_params['interop_ssid']:
testcase_name = list(ssid_dict)[0]
self.generate_interop_testcase(self.interop_base_test,
testcase_name, ssid_dict)
self.randomize_testcases()
def setup_class(self):
self.dut = self.android_devices[0]
self.admin = 'admin' + str(random.randint(1000001, 12345678))
wutils.wifi_test_device_init(self.dut)
# Set country code explicitly to "US".
self.dut.droid.wifiSetCountryCode(wutils.WifiEnums.CountryCode.US)
asserts.assert_true(
self.lock_pcap(),
"Could not lock a Packet Capture. Aborting Interop test.")
wutils.wifi_toggle_state(self.dut, True)
def lock_pcap(self):
"""Lock a Packet Capturere to use for the test."""
# Get list of devices from the datastore.
locked_pcap = False
devices = dutils.get_devices()
for device in devices:
device_name = device['hostname']
device_type = device['ap_label']
if device_type == 'PCAP' and not device['lock_status']:
if dutils.lock_device(device_name, self.admin):
self.pcap_host = device_name
host = device['ip_address']
self.log.info("Locked Packet Capture device: %s" % device_name)
locked_pcap = True
break
else:
self.log.warning("Failed to lock %s PCAP." % device_name)
if not locked_pcap:
return False
pcap_config = {'ssh_config':{'user':'root'} }
pcap_config['ssh_config']['host'] = host
self.pcap = packet_capture.PacketCapture(pcap_config)
return True
def setup_test(self):
self.dut.droid.wakeLockAcquireBright()
self.dut.droid.wakeUpNow()
def on_pass(self, test_name, begin_time):
wutils.stop_pcap(self.pcap, self.pcap_procs, True)
def on_fail(self, test_name, begin_time):
# Sleep to ensure all failed packets are captured.
time.sleep(5)
wutils.stop_pcap(self.pcap, self.pcap_procs, False)
self.dut.take_bug_report(test_name, begin_time)
self.dut.cat_adb_log(test_name, begin_time)
def teardown_test(self):
self.dut.droid.wakeLockRelease()
self.dut.droid.goToSleepNow()
def teardown_class(self):
# Unlock the PCAP.
if not dutils.unlock_device(self.pcap_host):
self.log.warning("Failed to unlock %s PCAP. Check in datastore.")
"""Helper Functions"""
def scan_and_connect_by_id(self, network, net_id):
"""Scan for network and connect using network id.
Args:
net_id: Integer specifying the network id of the network.
"""
ssid = network[WifiEnums.SSID_KEY]
wutils.start_wifi_connection_scan_and_ensure_network_found(self.dut,
ssid)
wutils.wifi_connect_by_id(self.dut, net_id)
def run_ping(self, sec):
"""Run ping for given number of seconds.
Args:
sec: Time in seconds to run teh ping traffic.
"""
self.log.info("Running ping for %d seconds" % sec)
result = self.dut.adb.shell("ping -w %d %s" % (sec, PING_ADDR),
timeout=sec + 1)
self.log.debug("Ping Result = %s" % result)
if "100% packet loss" in result:
raise signals.TestFailure("100% packet loss during ping")
def send_link_probes(self, network):
"""
Send link probes, and verify that the device and AP did not crash.
Also verify that at least one link probe succeeded.
Steps:
1. Send a few link probes.
2. Verify that at least one link probe succeeded.
3. Ensure that the device and AP did not crash (by checking that the
device remains connected to the expected network).
"""
results = wutils.send_link_probes(
self.dut, NUM_LINK_PROBES, PROBE_DELAY_SEC)
asserts.assert_true(any(result.is_success for result in results),
"Expect at least 1 probe success: " + str(results))
wifi_info = self.dut.droid.wifiGetConnectionInfo()
expected = network[WifiEnums.SSID_KEY]
actual = wifi_info[WifiEnums.SSID_KEY]
asserts.assert_equal(
expected, actual,
"Device did not remain connected after sending link probes!")
def unlock_and_turn_off_ap(self, hostname, rpm_port, rpm_ip):
"""UNlock the AP in datastore and turn off the AP.
Args:
hostname: Hostname of the AP.
rpm_port: Port number on the RPM for the AP.
rpm_ip: RPM's IP address.
"""
# Un-Lock AP in datastore.
self.log.debug("Un-lock AP in datastore")
if not dutils.unlock_device(hostname):
self.log.warning("Failed to unlock %s AP. Check AP in datastore.")
# Turn OFF AP from the RPM port.
rutils.turn_off_ap(rpm_port, rpm_ip)
def run_connect_disconnect(self, network, hostname, rpm_port, rpm_ip,
release_ap):
"""Run connect/disconnect to a given network in loop.
Args:
network: Dict, network information.
hostname: Hostanme of the AP to connect to.
rpm_port: Port number on the RPM for the AP.
rpm_ip: Port number on the RPM for the AP.
release_ap: Flag to determine if we should turn off the AP yet.
Raises: TestFailure if the network connection fails.
"""
for attempt in range(5):
try:
begin_time = time.time()
ssid = network[WifiEnums.SSID_KEY]
net_id = self.dut.droid.wifiAddNetwork(network)
asserts.assert_true(net_id != -1, "Add network %s failed" % network)
self.log.info("Connecting to %s" % ssid)
self.scan_and_connect_by_id(network, net_id)
self.run_ping(10)
self.send_link_probes(network)
wutils.wifi_forget_network(self.dut, ssid)
time.sleep(WAIT_BEFORE_CONNECTION)
except:
self.log.error("Connection to %s network failed on the %d "
"attempt." % (ssid, attempt))
# TODO:(bmahadev) Uncomment after scan issue is fixed.
# self.dut.take_bug_report(ssid, begin_time)
# self.dut.cat_adb_log(ssid, begin_time)
if release_ap:
self.unlock_and_turn_off_ap(hostname, rpm_port, rpm_ip)
raise signals.TestFailure("Failed to connect to %s" % ssid)
def get_band_and_chan(self, ssid):
"""Get the band and channel information from SSID.
Args:
ssid: SSID of the network.
"""
ssid_info = ssid.split('_')
self.band = ssid_info[-1]
for item in ssid_info:
# Skip over the router model part.
if 'ch' in item and item != ssid_info[0]:
self.chan = re.search(r'(\d+)',item).group(0)
return
raise signals.TestFailure("Channel information not found in SSID.")
def interop_base_test(self, ssid, hostname):
"""Base test for all the connect-disconnect interop tests.
Args:
ssid: string, SSID of the network to connect to.
hostname: string, hostname of the AP.
Steps:
1. Lock AP in datstore.
2. Turn on AP on the rpm switch.
3. Run connect-disconnect in loop.
4. Turn off AP on the rpm switch.
5. Unlock AP in datastore.
"""
network = {}
network['password'] = 'password'
network['SSID'] = ssid
release_ap = False
wutils.reset_wifi(self.dut)
# Lock AP in datastore.
self.log.info("Lock AP in datastore")
ap_info = dutils.show_device(hostname)
# If AP is locked by a different test admin, then we skip.
if ap_info['lock_status'] and ap_info['locked_by'] != self.admin:
raise signals.TestSkip("AP %s is locked, skipping test" % hostname)
if not dutils.lock_device(hostname, self.admin):
self.log.warning("Failed to lock %s AP. Unlock AP in datastore"
" and try again.")
raise signals.TestFailure("Failed to lock AP")
band = SINGLE_BAND
if ('ssid_2g' in ap_info) and ('ssid_5g' in ap_info):
band = DUAL_BAND
if (band == SINGLE_BAND) or (
band == DUAL_BAND and '5G' in ssid):
release_ap = True
# Get AP RPM attributes and Turn ON AP.
rpm_ip = ap_info['rpm_ip']
rpm_port = ap_info['rpm_port']
rutils.turn_on_ap(self.pcap, ssid, rpm_port, rpm_ip=rpm_ip)
self.log.info("Finished turning ON AP.")
# Experimental. Some APs take upto a min to come online.
time.sleep(60)
self.get_band_and_chan(ssid)
self.pcap.configure_monitor_mode(self.band, self.chan)
self.pcap_procs = wutils.start_pcap(
self.pcap, self.band.lower(), self.log_path, self.test_name)
self.run_connect_disconnect(network, hostname, rpm_port, rpm_ip,
release_ap)
# Un-lock only if it's a single band AP or we are running the last band.
if release_ap:
self.unlock_and_turn_off_ap(hostname, rpm_port, rpm_ip)