Refactor RvR and add Sensitivity Tests.
This CL refactors the existing WifiRvrTest class to increase
opportunities for code reuse and adds a WifiSensitivityTest class.
Sensitivity tests inherit from RvR, essentially running RvR at a fixed
MCS or data rate at the AP and checking when the DUT can no longer
sustain the expected throughput from that rate.
Test: Done
Bug: None
Change-Id: I83517d7143e632f27940249a687f3ad4b4ab5ecc
Signed-off-by: Omar El Ayach <oelayach@google.com>
diff --git a/acts/framework/acts/controllers/iperf_server.py b/acts/framework/acts/controllers/iperf_server.py
index 157d2e7..dc483bb 100755
--- a/acts/framework/acts/controllers/iperf_server.py
+++ b/acts/framework/acts/controllers/iperf_server.py
@@ -82,7 +82,7 @@
iperf_output = iperf_output[0:
iperf_output.index("}\n") + 1]
iperf_string = ''.join(iperf_output)
- iperf_string = iperf_string.replace("-nan", '0')
+ iperf_string = iperf_string.replace("nan", '0')
self.result = json.loads(iperf_string)
except ValueError:
with open(result_path, 'r') as f:
diff --git a/acts/framework/acts/test_utils/wifi/wifi_retail_ap.py b/acts/framework/acts/test_utils/wifi/wifi_retail_ap.py
index 82b8886..e9d607e 100644
--- a/acts/framework/acts/test_utils/wifi/wifi_retail_ap.py
+++ b/acts/framework/acts/test_utils/wifi/wifi_retail_ap.py
@@ -972,7 +972,12 @@
"2G": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11],
"5G_1": [36, 40, 44, 48, 149, 153, 157, 161, 165]
}
- self.BW_MODE_MAP = {"VHT20": 20, "VHT40": 40, "VHT80": 80}
+ self.BW_MODE_MAP = {
+ "legacy": 20,
+ "VHT20": 20,
+ "VHT40": 40,
+ "VHT80": 80
+ }
self.default_settings = {
"region": "United States",
"brand": "Google",
@@ -1154,7 +1159,7 @@
interface = self.access_point.wlan_5g
interface_short = "5"
- if mode.lower() in ["legacy", "11a", "11b", "11g"]:
+ if "legacy" in mode.lower():
cmd_string = "iw dev {0} set bitrates legacy-{1} {2} ht-mcs-{1} vht-mcs-{1}".format(
interface, interface_short, rate)
elif "vht" in mode.lower():
diff --git a/acts/tests/google/wifi/WifiRvrTest.py b/acts/tests/google/wifi/WifiRvrTest.py
index 5da2e93..4ebcb66 100644
--- a/acts/tests/google/wifi/WifiRvrTest.py
+++ b/acts/tests/google/wifi/WifiRvrTest.py
@@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import collections
import json
import logging
import math
@@ -42,26 +43,31 @@
TEST_TIMEOUT = 10
SHORT_SLEEP = 1
MED_SLEEP = 5
+ MAX_CONSECUTIVE_ZEROS = 5
def __init__(self, controllers):
base_test.BaseTestClass.__init__(self, controllers)
def setup_class(self):
+ """Initializes common test hardware and parameters.
+
+ This function initializes hardwares and compiles parameters that are
+ common to all tests in this class.
+ """
self.client_dut = self.android_devices[-1]
- req_params = ["rvr_test_params", "testbed_params"]
- opt_params = [
- "main_network", "RetailAccessPoints", "golden_files_list"
+ req_params = [
+ "RetailAccessPoints", "rvr_test_params", "testbed_params"
]
+ opt_params = ["main_network", "golden_files_list"]
self.unpack_userparams(req_params, opt_params)
- self.test_params = self.rvr_test_params
+ self.testclass_params = self.rvr_test_params
self.num_atten = self.attenuators[0].instrument.num_atten
self.iperf_server = self.iperf_servers[0]
- if hasattr(self, "RetailAccessPoints"):
- self.access_points = retail_ap.create(self.RetailAccessPoints)
- self.access_point = self.access_points[0]
- self.log.info("Access Point Configuration: {}".format(
- self.access_point.ap_settings))
- self.log_path = os.path.join(logging.log_path, "rvr_results")
+ self.access_points = retail_ap.create(self.RetailAccessPoints)
+ self.access_point = self.access_points[0]
+ self.log.info("Access Point Configuration: {}".format(
+ self.access_point.ap_settings))
+ self.log_path = os.path.join(logging.log_path, "results")
utils.create_dir(self.log_path)
if not hasattr(self, "golden_files_list"):
self.golden_files_list = [
@@ -79,11 +85,13 @@
self.iperf_server.stop()
def teardown_class(self):
- """Saves plot with all test results to enable comparison.
- """
# Turn WiFi OFF
for dev in self.android_devices:
wutils.wifi_toggle_state(dev, False)
+ self.process_testclass_results()
+
+ def process_testclass_results(self):
+ """Saves plot with all test results to enable comparison."""
# Plot and save all results
x_data = []
y_data = []
@@ -106,7 +114,7 @@
"linewidth": 3,
"markersize": 10
}
- output_file_path = "{}/{}.html".format(self.log_path, "rvr_results")
+ output_file_path = os.path.join(self.log_path, 'results.html')
wputils.bokeh_plot(
data_sets,
legends,
@@ -125,15 +133,8 @@
rvr_result: dict containing attenuation, throughput and other meta
data
"""
- test_name = self.current_test_name
- golden_path = [
- file_name for file_name in self.golden_files_list
- if test_name in file_name
- ]
try:
- golden_path = golden_path[0]
- throughput_limits = self.compute_throughput_limits(
- golden_path, rvr_result)
+ throughput_limits = self.compute_throughput_limits(rvr_result)
except:
asserts.fail("Test failed: Golden file not found")
@@ -151,14 +152,14 @@
format(current_att, current_throughput,
throughput_limits["lower_limit"][idx],
throughput_limits["upper_limit"][idx]))
- if failure_count >= self.test_params["failure_count_tolerance"]:
+ if failure_count >= self.testclass_params["failure_count_tolerance"]:
asserts.fail("Test failed. Found {} points outside limits.".format(
failure_count))
asserts.explicit_pass(
"Test passed. Found {} points outside throughput limits.".format(
failure_count))
- def compute_throughput_limits(self, golden_path, rvr_result):
+ def compute_throughput_limits(self, rvr_result):
"""Compute throughput limits for current test.
Checks the RvR test result and compares to a throughput limites for
@@ -166,12 +167,14 @@
config file.
Args:
- golden_path: path to golden file used to generate limits
rvr_result: dict containing attenuation, throughput and other meta
data
Returns:
throughput_limits: dict containing attenuation and throughput limit data
"""
+ test_name = self.current_test_name
+ golden_path = next(file_name for file_name in self.golden_files_list
+ if test_name in file_name)
with open(golden_path, 'r') as golden_file:
golden_results = json.load(golden_file)
golden_attenuation = [
@@ -199,12 +202,13 @@
attenuation.append(current_att)
lower_limit.append(
- max(closest_throughputs[0] - max(
- self.test_params["abs_tolerance"], closest_throughputs[0] *
- self.test_params["pct_tolerance"] / 100), 0))
+ max(closest_throughputs[0] -
+ max(self.testclass_params["abs_tolerance"],
+ closest_throughputs[0] *
+ self.testclass_params["pct_tolerance"] / 100), 0))
upper_limit.append(closest_throughputs[-1] + max(
- self.test_params["abs_tolerance"], closest_throughputs[-1] *
- self.test_params["pct_tolerance"] / 100))
+ self.testclass_params["abs_tolerance"], closest_throughputs[-1]
+ * self.testclass_params["pct_tolerance"] / 100))
throughput_limits = {
"attenuation": attenuation,
"lower_limit": lower_limit,
@@ -212,7 +216,7 @@
}
return throughput_limits
- def post_process_results(self, rvr_result):
+ def process_test_results(self, rvr_result):
"""Saves plots and JSON formatted results.
Args:
@@ -242,11 +246,9 @@
"markersize": 10
}
try:
- golden_path = [
- file_name for file_name in self.golden_files_list
- if test_name in file_name
- ]
- golden_path = golden_path[0]
+ golden_path = next(file_name
+ for file_name in self.golden_files_list
+ if test_name in file_name)
with open(golden_path, 'r') as golden_file:
golden_results = json.load(golden_file)
legends.insert(0, "Golden Results")
@@ -256,8 +258,7 @@
]
data_sets[0].insert(0, golden_attenuation)
data_sets[1].insert(0, golden_results["throughput_receive"])
- throughput_limits = self.compute_throughput_limits(
- golden_path, rvr_result)
+ throughput_limits = self.compute_throughput_limits(rvr_result)
shaded_region = {
"x_vector": throughput_limits["attenuation"],
"lower_limit": throughput_limits["lower_limit"],
@@ -270,33 +271,34 @@
wputils.bokeh_plot(data_sets, legends, fig_property, shaded_region,
output_file_path)
- def rvr_test(self):
+ def run_rvr_test(self, testcase_params):
"""Test function to run RvR.
The function runs an RvR test in the current device/AP configuration.
Function is called from another wrapper function that sets up the
testbed for the RvR test
+ Args:
+ testcase_params: dict containing test-specific parameters
Returns:
rvr_result: dict containing rvr_results and meta data
"""
self.log.info("Start running RvR")
- rvr_result = []
- for atten in self.rvr_atten_range:
+ zero_counter = 0
+ throughput = []
+ for atten in self.atten_range:
# Set Attenuation
self.log.info("Setting attenuation to {} dB".format(atten))
- [
- self.attenuators[i].set_atten(atten)
- for i in range(self.num_atten)
- ]
+ for attenuator in self.attenuators:
+ attenuator.set_atten(atten)
# Start iperf session
self.iperf_server.start(tag=str(atten))
try:
client_output = ""
client_status, client_output = self.client_dut.run_iperf_client(
self.testbed_params["iperf_server_address"],
- self.iperf_args,
- timeout=self.test_params["iperf_duration"] +
+ testcase_params["iperf_args"],
+ timeout=self.testclass_params["iperf_duration"] +
self.TEST_TIMEOUT)
except:
self.log.warning("TimeoutError: Iperf measurement timed out.")
@@ -307,81 +309,125 @@
out_file.write("\n".join(client_output))
self.iperf_server.stop()
# Parse and log result
- if self.use_client_output:
+ if testcase_params["use_client_output"]:
iperf_file = client_output_path
else:
iperf_file = self.iperf_server.log_files[-1]
try:
iperf_result = ipf.IPerfResult(iperf_file)
curr_throughput = (math.fsum(iperf_result.instantaneous_rates[
- self.test_params["iperf_ignored_interval"]:-1]) / len(
- iperf_result.instantaneous_rates[self.test_params[
+ self.testclass_params["iperf_ignored_interval"]:-1]) / len(
+ iperf_result.instantaneous_rates[self.testclass_params[
"iperf_ignored_interval"]:-1])) * 8 * (1.024**2)
except:
self.log.warning(
"ValueError: Cannot get iperf result. Setting to 0")
curr_throughput = 0
- rvr_result.append(curr_throughput)
+ throughput.append(curr_throughput)
self.log.info("Throughput at {0:.2f} dB is {1:.2f} Mbps".format(
atten, curr_throughput))
- [self.attenuators[i].set_atten(0) for i in range(self.num_atten)]
+ if curr_throughput == 0:
+ zero_counter = zero_counter + 1
+ else:
+ zero_counter = 0
+ if zero_counter == self.MAX_CONSECUTIVE_ZEROS:
+ self.log.info(
+ "Throughput stable at 0 Mbps. Stopping test now.")
+ throughput.extend([0] *
+ (len(self.atten_range) - len(throughput)))
+ break
+ for attenuator in self.attenuators:
+ attenuator.set_atten(0)
+ # Compile test result and meta data
+ rvr_result = collections.OrderedDict()
+ rvr_result["test_name"] = self.current_test_name
+ rvr_result["ap_settings"] = self.access_point.ap_settings.copy()
+ rvr_result["fixed_attenuation"] = self.testbed_params[
+ "fixed_attenuation"][str(testcase_params["channel"])]
+ rvr_result["attenuation"] = list(self.atten_range)
+ rvr_result["throughput_receive"] = throughput
return rvr_result
- def rvr_test_func(self, channel, mode):
- """Main function to test RvR.
-
- The function sets up the AP in the correct channel and mode
- configuration and called run_rvr to sweep attenuation and measure
- throughput
+ def setup_ap(self, testcase_params):
+ """Sets up the access point in the configuration required by the test.
Args:
- channel: Specifies AP's channel
- mode: Specifies AP's bandwidth/mode (11g, VHT20, VHT40, VHT80)
- Returns:
- rvr_result: dict containing rvr_results and meta data
+ testcase_params: dict containing AP and other test params
"""
- #Initialize RvR test parameters
- num_atten_steps = int((self.test_params["rvr_atten_stop"] -
- self.test_params["rvr_atten_start"]) /
- self.test_params["rvr_atten_step"])
- self.rvr_atten_range = [
- self.test_params["rvr_atten_start"] +
- x * self.test_params["rvr_atten_step"]
- for x in range(0, num_atten_steps)
- ]
- rvr_result = {}
- # Configure AP
- band = self.access_point.band_lookup_by_channel(channel)
+ band = self.access_point.band_lookup_by_channel(
+ testcase_params["channel"])
if "2G" in band:
- frequency = wutils.WifiEnums.channel_2G_to_freq[channel]
+ frequency = wutils.WifiEnums.channel_2G_to_freq[testcase_params[
+ "channel"]]
else:
- frequency = wutils.WifiEnums.channel_5G_to_freq[channel]
+ frequency = wutils.WifiEnums.channel_5G_to_freq[testcase_params[
+ "channel"]]
if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES:
self.access_point.set_region(self.testbed_params["DFS_region"])
else:
self.access_point.set_region(self.testbed_params["default_region"])
- self.access_point.set_channel(band, channel)
- self.access_point.set_bandwidth(band, mode)
+ self.access_point.set_channel(band, testcase_params["channel"])
+ self.access_point.set_bandwidth(band, testcase_params["mode"])
self.log.info("Access Point Configuration: {}".format(
self.access_point.ap_settings))
- # Set attenuator to 0 dB
- [self.attenuators[i].set_atten(0) for i in range(self.num_atten)]
- # Resest, configure, and connect DUT
+
+ def setup_dut(self, testcase_params):
+ """Sets up the DUT in the configuration required by the test.
+
+ Args:
+ testcase_params: dict containing AP and other test params
+ """
+ band = self.access_point.band_lookup_by_channel(
+ testcase_params["channel"])
wutils.reset_wifi(self.client_dut)
- self.client_dut.droid.wifiSetCountryCode(self.test_params["country_code"])
- self.main_network[band]["channel"] = channel
+ self.client_dut.droid.wifiSetCountryCode(
+ self.testclass_params["country_code"])
+ self.main_network[band]["channel"] = testcase_params["channel"]
wutils.wifi_connect(
self.client_dut, self.main_network[band], num_of_tries=5)
time.sleep(self.MED_SLEEP)
- # Run RvR and log result
- rvr_result["test_name"] = self.current_test_name
- rvr_result["ap_settings"] = self.access_point.ap_settings.copy()
- rvr_result["attenuation"] = list(self.rvr_atten_range)
- rvr_result["fixed_attenuation"] = self.testbed_params[
- "fixed_attenuation"][str(channel)]
- rvr_result["throughput_receive"] = self.rvr_test()
- self.testclass_results.append(rvr_result)
- return rvr_result
+
+ def setup_rvr_test(self, testcase_params):
+ """Function that gets devices ready for the test.
+
+ Args:
+ testcase_params: dict containing test-specific parameters
+ """
+ #Initialize RvR test parameters
+ num_atten_steps = int(
+ (testcase_params["atten_stop"] - testcase_params["atten_start"]) /
+ testcase_params["atten_step"])
+ self.atten_range = [
+ testcase_params["atten_start"] + x * testcase_params["atten_step"]
+ for x in range(0, num_atten_steps)
+ ]
+ # Configure AP
+ self.setup_ap(testcase_params)
+ # Set attenuator to 0 dB
+ for attenuator in self.attenuators:
+ attenuator.set_atten(0)
+ # Resest, configure, and connect DUT
+ self.setup_dut(testcase_params)
+
+ def parse_test_params(self, test_name):
+ """Function that generates test params based on the test name."""
+ test_name_params = test_name.split("_")
+ testcase_params = collections.OrderedDict()
+ testcase_params["channel"] = int(test_name_params[4][2:])
+ testcase_params["mode"] = test_name_params[5]
+ testcase_params["iperf_args"] = '-i 1 -t {} -J '.format(
+ self.testclass_params["iperf_duration"])
+ if test_name_params[2] == "UDP":
+ testcase_params[
+ "iperf_args"] = testcase_params["iperf_args"] + "-u -b {}".format(
+ self.testclass_params["UDP_rates"][testcase_params["mode"]])
+ if test_name_params[3] == "DL":
+ testcase_params[
+ "iperf_args"] = testcase_params["iperf_args"] + ' -R'
+ testcase_params["use_client_output"] = True
+ else:
+ testcase_params["use_client_output"] = False
+ return testcase_params
def _test_rvr(self):
""" Function that gets called for each test case
@@ -389,21 +435,17 @@
The function gets called in each rvr test case. The function customizes
the rvr test based on the test name of the test that called it
"""
- test_params = self.current_test_name.split("_")
- channel = int(test_params[4][2:])
- mode = test_params[5]
- self.iperf_args = '-i 1 -t {} -J '.format(
- self.test_params["iperf_duration"])
- if test_params[2] == "UDP":
- self.iperf_args = self.iperf_args + "-u -b {}".format(
- self.test_params["UDP_rates"][mode])
- if test_params[3] == "DL":
- self.iperf_args = self.iperf_args + ' -R'
- self.use_client_output = True
- else:
- self.use_client_output = False
- rvr_result = self.rvr_test_func(channel, mode)
- self.post_process_results(rvr_result)
+ # Compile test parameters from config and test name
+ testcase_params = self.parse_test_params(self.current_test_name)
+ testcase_params.update(self.testclass_params)
+
+ # Prepare devices and run test
+ self.setup_rvr_test(testcase_params)
+ rvr_result = self.run_rvr_test(testcase_params)
+
+ # Post-process results
+ self.testclass_results.append(rvr_result)
+ self.process_test_results(rvr_result)
self.pass_fail_check(rvr_result)
#Test cases
diff --git a/acts/tests/google/wifi/WifiSensitivityTest.py b/acts/tests/google/wifi/WifiSensitivityTest.py
new file mode 100644
index 0000000..31ec8dc
--- /dev/null
+++ b/acts/tests/google/wifi/WifiSensitivityTest.py
@@ -0,0 +1,412 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2017 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+import json
+import logging
+import os
+import WifiRvrTest
+from acts import asserts
+from acts import base_test
+from acts import utils
+from acts.test_decorators import test_tracker_info
+from acts.test_utils.wifi import wifi_test_utils as wutils
+from acts.test_utils.wifi import wifi_retail_ap as retail_ap
+
+
+class WifiSensitivityTest(WifiRvrTest.WifiRvrTest):
+ """Class to test WiFi sensitivity tests.
+
+ This class implements measures WiFi sensitivity per rate. It heavily
+ leverages the WifiRvrTest class and introduced minor differences to set
+ specific rates and the access point, and implements a different pass/fail
+ check. For an example config file to run this test class see
+ example_connectivity_performance_ap_sta.json.
+ """
+
+ VALID_TEST_CONFIGS = {
+ 1: ["legacy", "VHT20"],
+ 2: ["legacy", "VHT20"],
+ 6: ["legacy", "VHT20"],
+ 10: ["legacy", "VHT20"],
+ 11: ["legacy", "VHT20"],
+ 36: ["legacy", "VHT20"],
+ 40: ["legacy", "VHT20"],
+ 44: ["legacy", "VHT20"],
+ 48: ["legacy", "VHT20"],
+ 149: ["legacy", "VHT20"],
+ 153: ["legacy", "VHT20"],
+ 157: ["legacy", "VHT20"],
+ 161: ["legacy", "VHT20"]
+ }
+ VALID_RATES = {
+ "legacy_2GHz": [[54, 1], [48, 1], [36, 1], [24, 1], [18, 1], [12, 1],
+ [11, 1], [9, 1], [6, 1], [5.5, 1], [2, 1], [1, 1]],
+ "legacy_5GHz": [[54, 1], [48, 1], [36, 1], [24, 1], [18, 1], [12, 1],
+ [9, 1], [6, 1]],
+ "HT": [[8, 1], [7, 1], [6, 1], [5, 1], [4, 1], [3, 1], [2, 1], [1, 1],
+ [0, 1], [15, 2], [14, 2], [13, 2], [12, 2], [11, 2], [10, 2],
+ [9, 2], [8, 2]],
+ "VHT": [[9, 1], [8, 1], [7, 1], [6, 1], [5, 1], [4, 1], [3, 1], [2, 1],
+ [1, 1], [0, 1], [9, 2], [8, 2], [7, 2], [6, 2], [5, 2], [4, 2],
+ [3, 2], [2, 2], [1, 2], [0, 2]]
+ }
+
+ def setup_class(self):
+ """Initializes common test hardware and parameters.
+
+ This function initializes hardwares and compiles parameters that are
+ common to all tests in this class.
+ """
+ self.client_dut = self.android_devices[-1]
+ req_params = [
+ "RetailAccessPoints", "sensitivity_test_params", "testbed_params"
+ ]
+ opt_params = ["main_network", "golden_files_list"]
+ self.unpack_userparams(req_params, opt_params)
+ self.testclass_params = self.sensitivity_test_params
+ self.num_atten = self.attenuators[0].instrument.num_atten
+ self.iperf_server = self.iperf_servers[0]
+ self.access_points = retail_ap.create(self.RetailAccessPoints)
+ self.access_point = self.access_points[0]
+ self.log.info("Access Point Configuration: {}".format(
+ self.access_point.ap_settings))
+ self.log_path = os.path.join(logging.log_path, "results")
+ utils.create_dir(self.log_path)
+ if not hasattr(self, "golden_files_list"):
+ self.golden_files_list = [
+ os.path.join(self.testbed_params["golden_results_path"],
+ file) for file in os.listdir(
+ self.testbed_params["golden_results_path"])
+ ]
+ self.testclass_results = []
+
+ # Turn WiFi ON
+ for dev in self.android_devices:
+ wutils.wifi_toggle_state(dev, True)
+
+ def pass_fail_check(self, rvr_result):
+ """Checks sensitivity against golden results and decides on pass/fail.
+
+ Args:
+ rvr_result: dict containing attenuation, throughput and other meta
+ data
+ """
+ try:
+ golden_path = next(file_name
+ for file_name in self.golden_files_list
+ if "sensitivity_targets" in file_name)
+ with open(golden_path, 'r') as golden_file:
+ golden_results = json.load(golden_file)
+ except:
+ asserts.fail("Test failed: Golden file not found")
+
+ golden_sensitivity = golden_results[self.current_test_name][
+ "sensitivity"]
+ result_string = "Througput = {}, Sensitivity = {}. Target Sensitivity = {}".format(
+ rvr_result["peak_throughput"], rvr_result["sensitivity"],
+ golden_sensitivity)
+ if rvr_result["sensitivity"] - golden_sensitivity > self.testclass_params["sensitivity_tolerance"]:
+ asserts.fail("Test Failed. {}".format(result_string))
+ else:
+ asserts.explicit_pass("Test Passed. {}".format(result_string))
+
+ def process_testclass_results(self):
+ """Saves and plots test results from all executed test cases."""
+ testclass_results_dict = collections.OrderedDict()
+ for result in self.testclass_results:
+ testclass_results_dict[result["test_name"]] = {
+ "peak_throughput": result["peak_throughput"],
+ "range": result["range"],
+ "sensitivity": result["sensitivity"]
+ }
+ results_file_path = os.path.join(self.log_path, 'results.json')
+ with open(results_file_path, 'w') as results_file:
+ json.dump(testclass_results_dict, results_file, indent=4)
+ super().process_testclass_results()
+
+ def process_test_results(self, rvr_result):
+ """Post processes RvR results to compute sensitivity.
+
+ Takes in the results of the RvR tests and computes the sensitivity of
+ the current rate by looking at the point at which throughput drops
+ below the percentage specified in the config file. The function then
+ calls on its parent class process_test_results to plot the result.
+
+ Args:
+ rvr_result: dict containing attenuation, throughput and other meta
+ data
+ """
+ rvr_result["peak_throughput"] = max(rvr_result["throughput_receive"])
+ throughput_check = [
+ throughput < rvr_result["peak_throughput"] *
+ (self.testclass_params["throughput_pct_at_sensitivity"] / 100)
+ for throughput in rvr_result["throughput_receive"]
+ ]
+ consistency_check = [
+ idx for idx in range(len(throughput_check))
+ if all(throughput_check[idx:])
+ ]
+ rvr_result["atten_at_range"] = rvr_result["attenuation"][
+ consistency_check[0] - 1]
+ rvr_result["range"] = rvr_result["fixed_attenuation"] + (
+ rvr_result["atten_at_range"])
+ rvr_result["sensitivity"] = self.testclass_params["ap_tx_power"] - (
+ rvr_result["range"])
+ super().process_test_results(rvr_result)
+
+ def setup_ap(self, testcase_params):
+ """Sets up the access point in the configuration required by the test.
+
+ Args:
+ testcase_params: dict containing AP and other test params
+ """
+ band = self.access_point.band_lookup_by_channel(
+ testcase_params["channel"])
+ if "2G" in band:
+ frequency = wutils.WifiEnums.channel_2G_to_freq[testcase_params[
+ "channel"]]
+ else:
+ frequency = wutils.WifiEnums.channel_5G_to_freq[testcase_params[
+ "channel"]]
+ if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES:
+ self.access_point.set_region(self.testbed_params["DFS_region"])
+ else:
+ self.access_point.set_region(self.testbed_params["default_region"])
+ self.access_point.set_channel(band, testcase_params["channel"])
+ self.access_point.set_bandwidth(band, testcase_params["mode"])
+ self.access_point.set_power(band, testcase_params["ap_tx_power"])
+ self.access_point.set_rate(
+ band, testcase_params["mode"], testcase_params["num_streams"],
+ testcase_params["rate"], testcase_params["short_gi"])
+ self.log.info("Access Point Configuration: {}".format(
+ self.access_point.ap_settings))
+
+ def get_start_atten(self):
+ """Gets the starting attenuation for this sensitivity test.
+
+ The function gets the starting attenuation by checking whether a test
+ as the next higher MCS has been executed. If so it sets the starting
+ point a configurable number of dBs below the next MCS's sensitivity.
+
+ Returns:
+ start_atten: starting attenuation for current test
+ """
+ # Get the current and reference test config. The reference test is the
+ # one performed at the current MCS+1
+ current_test_params = self.parse_test_params(self.current_test_name)
+ ref_test_params = current_test_params.copy()
+ if "legacy" in current_test_params["mode"] and current_test_params["rate"] < 54:
+ if current_test_params["channel"] <= 13:
+ ref_index = self.VALID_RATES["legacy_2GHz"].index(
+ [current_test_params["rate"], 1]) - 1
+ ref_test_params["rate"] = self.VALID_RATES["legacy_2GHz"][
+ ref_index][0]
+ else:
+ ref_index = self.VALID_RATES["legacy_5GHz"].index(
+ [current_test_params["rate"], 1]) - 1
+ ref_test_params["rate"] = self.VALID_RATES["legacy_5GHz"][
+ ref_index][0]
+ else:
+ ref_test_params["rate"] = ref_test_params["rate"] + 1
+
+ # Check if reference test has been run and set attenuation accordingly
+ previous_params = [
+ self.parse_test_params(result["test_name"])
+ for result in self.testclass_results
+ ]
+ try:
+ ref_index = previous_params.index(ref_test_params)
+ start_atten = self.testclass_results[ref_index]["atten_at_range"] - (
+ self.testclass_params["adjacent_mcs_range_gap"])
+ except:
+ print("Reference test not found. Starting from {} dB".format(
+ self.testclass_params["atten_start"]))
+ start_atten = self.testclass_params["atten_start"]
+ return start_atten
+
+ def parse_test_params(self, test_name):
+ """Function that generates test params based on the test name."""
+ test_name_params = test_name.split("_")
+ testcase_params = collections.OrderedDict()
+ testcase_params["channel"] = int(test_name_params[2][2:])
+ testcase_params["mode"] = test_name_params[3]
+ if "legacy" in testcase_params["mode"].lower():
+ testcase_params["rate"] = float(
+ str(test_name_params[4]).replace("p", "."))
+ else:
+ testcase_params["rate"] = int(test_name_params[4][3:])
+ testcase_params["num_streams"] = int(test_name_params[5][3:])
+ testcase_params["short_gi"] = 0
+ if self.testclass_params["traffic_type"] == "UDP":
+ testcase_params["iperf_args"] = '-i 1 -t {} -J -u -b {} -R'.format(
+ self.testclass_params["iperf_duration"],
+ self.testclass_params["UDP_rates"][testcase_params["mode"]])
+ else:
+ testcase_params["iperf_args"] = '-i 1 -t {} -J -R'.format(
+ self.testclass_params["iperf_duration"])
+ testcase_params["use_client_output"] = True
+ return testcase_params
+
+ def _test_sensitivity(self):
+ """ Function that gets called for each test case
+
+ The function gets called in each rvr test case. The function customizes
+ the rvr test based on the test name of the test that called it
+ """
+ # Compile test parameters from config and test name
+ testcase_params = self.parse_test_params(self.current_test_name)
+ testcase_params.update(self.testclass_params)
+ testcase_params["atten_start"] = self.get_start_atten()
+
+ # Prepare devices and run test
+ self.setup_rvr_test(testcase_params)
+ rvr_result = self.run_rvr_test(testcase_params)
+
+ # Post-process results
+ self.process_test_results(rvr_result)
+ self.testclass_results.append(rvr_result)
+ self.pass_fail_check(rvr_result)
+ # Add results to testclass_results
+
+ def generate_test_cases(self, channels):
+ """Function that auto-generates test cases for a test class."""
+ testcase_wrapper = self._test_sensitivity
+ for channel in channels:
+ for mode in self.VALID_TEST_CONFIGS[channel]:
+ if "VHT" in mode:
+ rates = self.VALID_RATES["VHT"]
+ elif "HT" in mode:
+ rates = self.VALID_RATES["HT"]
+ elif "legacy" in mode and channel < 14:
+ rates = self.VALID_RATES["legacy_2GHz"]
+ elif "legacy" in mode and channel > 14:
+ rates = self.VALID_RATES["legacy_5GHz"]
+ else:
+ raise ValueError("Invalid test mode.")
+ for rate in rates:
+ if "legacy" in mode:
+ testcase_name = "test_sensitivity_ch{}_{}_{}_nss{}".format(
+ channel, mode,
+ str(rate[0]).replace(".", "p"), rate[1])
+ else:
+ testcase_name = "test_sensitivity_ch{}_{}_mcs{}_nss{}".format(
+ channel, mode, rate[0], rate[1])
+ setattr(self, testcase_name, testcase_wrapper)
+ self.tests.append(testcase_name)
+
+
+class WifiSensitivity_AllChannels_Test(WifiSensitivityTest):
+ def __init__(self, controllers):
+ base_test.BaseTestClass.__init__(self, controllers)
+ self.generate_test_cases(
+ [1, 2, 6, 10, 11, 36, 40, 44, 48, 149, 153, 157, 161])
+
+
+class WifiSensitivity_2GHz_Test(WifiSensitivityTest):
+ def __init__(self, controllers):
+ base_test.BaseTestClass.__init__(self, controllers)
+ self.generate_test_cases([1, 2, 6, 10, 11])
+
+
+class WifiSensitivity_UNII1_Test(WifiSensitivityTest):
+ def __init__(self, controllers):
+ base_test.BaseTestClass.__init__(self, controllers)
+ self.generate_test_cases([36, 40, 44, 48])
+
+
+class WifiSensitivity_UNII3_Test(WifiSensitivityTest):
+ def __init__(self, controllers):
+ base_test.BaseTestClass.__init__(self, controllers)
+ self.generate_test_cases([149, 153, 157, 161])
+
+
+class WifiSensitivity_ch1_Test(WifiSensitivityTest):
+ def __init__(self, controllers):
+ base_test.BaseTestClass.__init__(self, controllers)
+ self.generate_test_cases([1])
+
+
+class WifiSensitivity_ch2_Test(WifiSensitivityTest):
+ def __init__(self, controllers):
+ base_test.BaseTestClass.__init__(self, controllers)
+ self.generate_test_cases([2])
+
+
+class WifiSensitivity_ch6_Test(WifiSensitivityTest):
+ def __init__(self, controllers):
+ base_test.BaseTestClass.__init__(self, controllers)
+ self.generate_test_cases([6])
+
+
+class WifiSensitivity_ch10_Test(WifiSensitivityTest):
+ def __init__(self, controllers):
+ base_test.BaseTestClass.__init__(self, controllers)
+ self.generate_test_cases([10])
+
+
+class WifiSensitivity_ch11_Test(WifiSensitivityTest):
+ def __init__(self, controllers):
+ base_test.BaseTestClass.__init__(self, controllers)
+ self.generate_test_cases([11])
+
+
+class WifiSensitivity_ch36_Test(WifiSensitivityTest):
+ def __init__(self, controllers):
+ base_test.BaseTestClass.__init__(self, controllers)
+ self.generate_test_cases([36])
+
+
+class WifiSensitivity_ch40_Test(WifiSensitivityTest):
+ def __init__(self, controllers):
+ base_test.BaseTestClass.__init__(self, controllers)
+ self.generate_test_cases([40])
+
+
+class WifiSensitivity_ch44_Test(WifiSensitivityTest):
+ def __init__(self, controllers):
+ base_test.BaseTestClass.__init__(self, controllers)
+ self.generate_test_cases([44])
+
+
+class WifiSensitivity_ch48_Test(WifiSensitivityTest):
+ def __init__(self, controllers):
+ base_test.BaseTestClass.__init__(self, controllers)
+ self.generate_test_cases([48])
+
+
+class WifiSensitivity_ch149_Test(WifiSensitivityTest):
+ def __init__(self, controllers):
+ base_test.BaseTestClass.__init__(self, controllers)
+ self.generate_test_cases([149])
+
+
+class WifiSensitivity_ch153_Test(WifiSensitivityTest):
+ def __init__(self, controllers):
+ base_test.BaseTestClass.__init__(self, controllers)
+ self.generate_test_cases([153])
+
+
+class WifiSensitivity_ch157_Test(WifiSensitivityTest):
+ def __init__(self, controllers):
+ base_test.BaseTestClass.__init__(self, controllers)
+ self.generate_test_cases([157])
+
+
+class WifiSensitivity_ch161_Test(WifiSensitivityTest):
+ def __init__(self, controllers):
+ base_test.BaseTestClass.__init__(self, controllers)
+ self.generate_test_cases([161])