Improvements to RvR tests and minor bokeh changes
Some improvements to RvR tests predominantly in plotting and
post processing results including the pass/fail criterion. The
test class now supports calibration data so that results from
different test racks can be compared to golden results. The
CL also includes updates to bokeh plotting to remove warnings
and become compatible with newer versions of bokeh.
Test: Done
Bug: 65563975
Change-Id: I3163ede132e248333e20647981e16c90a70de7ac
(cherry picked from commit 71e5783f45510b53f2662fe7bf55043da80b9508)
diff --git a/acts/framework/acts/test_utils/wifi/wifi_power_test_utils.py b/acts/framework/acts/test_utils/wifi/wifi_power_test_utils.py
index 7fdbbf0..50ef6dc 100644
--- a/acts/framework/acts/test_utils/wifi/wifi_power_test_utils.py
+++ b/acts/framework/acts/test_utils/wifi/wifi_power_test_utils.py
@@ -24,6 +24,7 @@
from acts.test_utils.wifi import wifi_test_utils as wutils
from bokeh.layouts import layout
from bokeh.models import CustomJS, ColumnDataSource
+from bokeh.models import tools as bokeh_tools
from bokeh.models.widgets import DataTable, TableColumn
from bokeh.plotting import figure, output_file, save
from acts.controllers.ap_lib import hostapd_security
@@ -212,15 +213,14 @@
color = ['navy'] * len(current_data)
#Preparing the data and source link for bokehn java callback
- source = ColumnDataSource(
- data=dict(x0=time_relative, y0=current_data, color=color))
- s2 = ColumnDataSource(
- data=dict(
- z0=[mon_info['duration']],
- y0=[round(avg_current, 2)],
- x0=[round(avg_current * voltage, 2)],
- z1=[round(avg_current * voltage * mon_info['duration'], 2)],
- z2=[round(avg_current * mon_info['duration'], 2)]))
+ source = ColumnDataSource(data=dict(
+ x0=time_relative, y0=current_data, color=color))
+ s2 = ColumnDataSource(data=dict(
+ z0=[mon_info['duration']],
+ y0=[round(avg_current, 2)],
+ x0=[round(avg_current * voltage, 2)],
+ z1=[round(avg_current * voltage * mon_info['duration'], 2)],
+ z2=[round(avg_current * mon_info['duration'], 2)]))
#Setting up data table for the output
columns = [
TableColumn(field='z0', title='Total Duration (s)'),
@@ -234,15 +234,16 @@
plot_title = file_path[file_path.rfind('/') + 1:-4] + tag
output_file("%s/%s.html" % (mon_info['data_path'], plot_title))
- TOOLS = ('box_zoom,box_select,pan,crosshair,redo,undo,resize,reset,'
- 'hover,xwheel_zoom,ywheel_zoom,save')
+ TOOLS = ('box_zoom,box_select,pan,crosshair,redo,undo,reset,hover,save')
# Create a new plot with the datatable above
plot = figure(
plot_width=1300,
plot_height=700,
title=plot_title,
tools=TOOLS,
- webgl=True)
+ output_backend="webgl")
+ plot.add_tools(bokeh_tools.WheelZoomTool(dimensions="width"))
+ plot.add_tools(bokeh_tools.WheelZoomTool(dimensions="height"))
plot.line('x0', 'y0', source=source, line_width=2)
plot.circle('x0', 'y0', source=source, size=0.5, fill_color='color')
plot.xaxis.axis_label = 'Time (s)'
@@ -393,14 +394,15 @@
Returns:
plot: bokeh plot figure object
"""
- TOOLS = ('box_zoom,box_select,pan,crosshair,redo,undo,resize,reset,'
- 'hover,xwheel_zoom,ywheel_zoom,save')
+ TOOLS = ('box_zoom,box_select,pan,crosshair,redo,undo,reset,hover,save')
plot = figure(
plot_width=1300,
plot_height=700,
title=fig_property['title'],
tools=TOOLS,
- webgl=True)
+ output_backend="webgl")
+ plot.add_tools(bokeh_tools.WheelZoomTool(dimensions="width"))
+ plot.add_tools(bokeh_tools.WheelZoomTool(dimensions="height"))
colors = [
'red', 'green', 'blue', 'olive', 'orange', 'salmon', 'black', 'navy',
'yellow', 'darkred', 'goldenrod'
diff --git a/acts/tests/google/wifi/WifiRvrTest.py b/acts/tests/google/wifi/WifiRvrTest.py
index 7cf74eb..a378813 100644
--- a/acts/tests/google/wifi/WifiRvrTest.py
+++ b/acts/tests/google/wifi/WifiRvrTest.py
@@ -48,17 +48,46 @@
utils.create_dir(self.log_path)
self.log.info("Access Point Configuration: {}".format(
self.access_point.ap_settings))
+ self.testclass_results = []
def teardown_test(self):
self.iperf_server.stop()
+ def teardown_class(self):
+ """Saves plot with all test results to enable comparison.
+ """
+ # Plot and save all results
+ x_data = []
+ y_data = []
+ legends = []
+ for result in self.testclass_results:
+ total_attenuation = [
+ att + result["fixed_attenuation"]
+ for att in result["attenuation"]
+ ]
+ x_data.append(total_attenuation)
+ y_data.append(result["throughput_receive"])
+ legends.append(result["test_name"])
+ x_label = 'Attenuation (dB)'
+ y_label = 'Throughput (Mbps)'
+ data_sets = [x_data, y_data]
+ fig_property = {
+ "title": "RvR Results",
+ "x_label": x_label,
+ "y_label": y_label,
+ "linewidth": 3,
+ "markersize": 10
+ }
+ output_file_path = "{}/{}.html".format(self.log_path, "rvr_results")
+ wputils.bokeh_plot(data_sets, legends, fig_property, output_file_path)
+
def pass_fail_check(self, rvr_result):
"""Check the test result and decide if it passed or failed.
Checks the RvR test result and compares to a golden file of results for
the same configuration. The pass/fail tolerances are provided in the
config file. Currently, the test fails if any a single point is out of
- range of the corresponding point in the golden file.
+ range of the corresponding area in the golden file.
Args:
rvr_result: dict containing attenuation, throughput and other meta
@@ -69,21 +98,45 @@
"{}.json".format(test_name))
with open(gldn_path, 'r') as gldn_file:
gldn_results = json.load(gldn_file)
+ gldn_attenuation = [
+ att + gldn_results["fixed_attenuation"]
+ for att in gldn_results["attenuation"]
+ ]
for idx, current_throughput in enumerate(
rvr_result["throughput_receive"]):
- current_att = rvr_result["attenuation"][idx]
- gldn_att_index = gldn_results["attenuation"].index(current_att)
- gldn_throughput = gldn_results["throughput_receive"][
- gldn_att_index]
- abs_difference = abs(current_throughput - gldn_throughput)
- pct_difference = (abs_difference /
- (gldn_throughput + EPSILON)) * 100
- if (abs_difference > self.test_params["abs_tolerance"]
- and pct_difference > self.test_params["pct_tolerance"]):
+ current_att = rvr_result["attenuation"][idx] + rvr_result["fixed_attenuation"]
+ att_distances = [
+ abs(current_att - gldn_att) for gldn_att in gldn_attenuation
+ ]
+ sorted_distances = sorted(
+ enumerate(att_distances), key=lambda x: x[1])
+ closest_indeces = [dist[0] for dist in sorted_distances[0:2]]
+ closest_throughputs = [
+ gldn_results["throughput_receive"][index]
+ for index in closest_indeces
+ ]
+ closest_throughputs.sort()
+
+ allowed_throughput_range = [
+ max(closest_throughputs[0] - max(
+ self.test_params["abs_tolerance"], closest_throughputs[0] *
+ self.test_params["pct_tolerance"] / 100),
+ 0), closest_throughputs[1] +
+ max(self.test_params["abs_tolerance"], closest_throughputs[1] *
+ self.test_params["pct_tolerance"] / 100)
+ ]
+ throughput_distance = [
+ current_throughput - throughput_limit
+ for throughput_limit in allowed_throughput_range
+ ]
+ if (throughput_distance[0] < -self.test_params["abs_tolerance"]
+ or throughput_distance[1] >
+ self.test_params["abs_tolerance"]):
asserts.fail(
"Throughput at {}dB attenuation is beyond limits. "
- "Throughput is {} Mbps. Expected {} Mbps.".format(
- current_att, current_throughput, gldn_throughput))
+ "Throughput is {} Mbps. Expected within {} Mbps.".format(
+ current_att, current_throughput,
+ allowed_throughput_range))
asserts.explicit_pass("Measurement finished for %s." % test_name)
def post_process_results(self, rvr_result):
@@ -95,16 +148,19 @@
"""
# Save output as text file
test_name = self.current_test_name
- results_file_path = "{}/{}.txt".format(self.log_path,
- self.current_test_name)
+ results_file_path = "{}/{}.json".format(self.log_path,
+ self.current_test_name)
with open(results_file_path, 'w') as results_file:
json.dump(rvr_result, results_file)
# Plot and save
- legends = self.current_test_name
+ legends = [self.current_test_name]
x_label = 'Attenuation (dB)'
y_label = 'Throughput (Mbps)'
- data_sets = [[rvr_result["attenuation"]],
- [rvr_result["throughput_receive"]]]
+ total_attenuation = [
+ att + rvr_result["fixed_attenuation"]
+ for att in rvr_result["attenuation"]
+ ]
+ data_sets = [[total_attenuation], [rvr_result["throughput_receive"]]]
fig_property = {
"title": test_name,
"x_label": x_label,
@@ -112,6 +168,20 @@
"linewidth": 3,
"markersize": 10
}
+ try:
+ gldn_path = os.path.join(self.test_params["golden_results_path"],
+ "{}.json".format(test_name))
+ with open(gldn_path, 'r') as gldn_file:
+ gldn_results = json.load(gldn_file)
+ legends.insert(0, "Golden Results")
+ gldn_attenuation = [
+ att + gldn_results["fixed_attenuation"]
+ for att in gldn_results["attenuation"]
+ ]
+ data_sets[0].insert(0, gldn_attenuation)
+ data_sets[1].insert(0, gldn_results["throughput_receive"])
+ except:
+ self.log.warning("ValueError: Golden file not found")
output_file_path = "{}/{}.html".format(self.log_path, test_name)
wputils.bokeh_plot(data_sets, legends, fig_property, output_file_path)
@@ -137,18 +207,18 @@
# Start iperf session
self.iperf_server.start(tag=str(atten))
try:
+ client_output = ""
client_status, client_output = self.dut.run_iperf_client(
self.test_params["iperf_server_address"],
self.iperf_args,
timeout=self.test_params["iperf_duration"] + TEST_TIMEOUT)
- client_output_path = os.path.join(
- self.iperf_server.log_path,
- "iperf_client_output_{}_{}".format(self.current_test_name,
- str(atten)))
- with open(client_output_path, 'w') as out_file:
- out_file.write("\n".join(client_output))
except:
self.log.warning("TimeoutError: Iperf measurement timed out.")
+ client_output_path = os.path.join(
+ self.iperf_server.log_path, "iperf_client_output_{}_{}".format(
+ self.current_test_name, str(atten)))
+ with open(client_output_path, 'w') as out_file:
+ out_file.write("\n".join(client_output))
self.iperf_server.stop()
# Parse and log result
if self.use_client_output:
@@ -198,13 +268,18 @@
# Set attenuator to 0 dB
[self.attenuators[i].set_atten(0) for i in range(self.num_atten)]
# Connect DUT to Network
+ wutils.reset_wifi(self.dut)
self.main_network[band]["channel"] = channel
- wutils.wifi_connect(self.dut, self.main_network[band])
+ wutils.wifi_connect(self.dut, self.main_network[band], num_of_tries=5)
time.sleep(5)
# Run RvR and log result
+ rvr_result["test_name"] = self.current_test_name
rvr_result["ap_settings"] = self.access_point.ap_settings.copy()
rvr_result["attenuation"] = list(self.rvr_atten_range)
+ rvr_result["fixed_attenuation"] = self.test_params[
+ "fixed_attenuation"][str(channel)]
rvr_result["throughput_receive"] = self.rvr_test()
+ self.testclass_results.append(rvr_result)
return rvr_result
def _test_rvr(self):