Refractor to kill iperf client on NUC.
And optimization on chart plotting.
Change-Id: I785e91ec3fff2138a790f4e0746dfbb929df337b
diff --git a/acts/framework/acts/test_utils/coex/CoexPerformanceBaseTest.py b/acts/framework/acts/test_utils/coex/CoexPerformanceBaseTest.py
index 7293533..7c0c7e8 100644
--- a/acts/framework/acts/test_utils/coex/CoexPerformanceBaseTest.py
+++ b/acts/framework/acts/test_utils/coex/CoexPerformanceBaseTest.py
@@ -17,7 +17,7 @@
import json
import os
import time
-
+from collections import defaultdict
from acts.test_utils.bt.bt_test_utils import disable_bluetooth
from acts.test_utils.coex.CoexBaseTest import CoexBaseTest
from acts.test_utils.coex.coex_test_utils import bokeh_chart_plot
@@ -29,17 +29,32 @@
from acts.test_utils.wifi.wifi_test_utils import wifi_test_device_init
+def get_atten_range(start, stop, step):
+ """Function to derive attenuation range for tests.
+
+ Args:
+ start: Start attenuation value.
+ stop: Stop attenuation value.
+ step: Step attenuation value.
+
+ Returns:
+ list of attenuation range.
+ """
+ atten_step = int((stop - start)/ step)
+ atten_range = [start + x * step for x in range(0, atten_step)]
+ return atten_range
+
class CoexPerformanceBaseTest(CoexBaseTest):
- """Base test class for performance tests
+ """Base test class for performance tests.
Attributes:
rvr : Dict to save attenuation, throughput, fixed_attenuation.
- flag : Used to denote a2dp test cases.
+ a2dp_streaming : Used to denote a2dp test cases.
"""
def __init__(self, controllers):
super().__init__(controllers)
- self.flag = False
+ self.a2dp_streaming = False
self.rvr = {}
def setup_class(self):
@@ -50,21 +65,27 @@
else:
self.log.error("Attenuator should be connected to run tests.")
return False
+ for i in range(self.num_atten):
+ self.attenuators[i].set_atten(0)
super().setup_class()
if "performance_result_path" in self.user_params["test_params"]:
self.performance_files_list = [
os.path.join(self.test_params["performance_result_path"],
- file) for file in os.listdir(
+ files) for files in os.listdir(
self.test_params["performance_result_path"])
]
- self.bt_attenuation_range = range(self.test_params["bt_atten_start"],
- self.test_params["bt_atten_stop"],
- self.test_params["bt_atten_step"])
- self.attenuation_range = range(self.test_params["attenuation_start"],
- self.test_params["attenuation_stop"],
- self.test_params["attenuation_step"])
+ self.bt_atten_range = get_atten_range(
+ self.test_params["bt_atten_start"],
+ self.test_params["bt_atten_stop"],
+ self.test_params["bt_atten_step"])
+ self.wifi_atten_range = get_atten_range(
+ self.test_params["attenuation_start"],
+ self.test_params["attenuation_stop"],
+ self.test_params["attenuation_step"])
def setup_test(self):
+ if "a2dp_streaming" in self.current_test_name:
+ self.a2dp_streaming = True
for i in range(self.num_atten):
self.attenuators[i].set_atten(0)
if not wifi_connection_check(self.pri_ad, self.network["SSID"]):
@@ -73,8 +94,8 @@
def teardown_test(self):
self.performance_baseline_check()
- self.attenuators[self.num_atten - 1].set_atten(0)
- for i in range(self.num_atten - 1):
+ for i in range(self.num_atten):
+ self.attenuators[i].set_atten(0)
current_atten = int(self.attenuators[i].get_atten())
self.log.debug(
"Setting attenuation to zero : Current atten {} : {}".format(
@@ -99,8 +120,12 @@
False if Fail
"""
self.attenuators[self.num_atten - 1].set_atten(0)
- for bt_atten in self.bt_attenuation_range:
+ self.rvr["bt_attenuation"] = []
+ self.rvr["test_name"] = self.current_test_name
+ for bt_atten in self.bt_atten_range:
self.rvr[bt_atten] = {}
+ self.rvr[bt_atten]["fixed_attenuation"] = (
+ self.test_params["fixed_attenuation"][str(self.network["channel"])])
self.log.info("Setting bt attenuation = {}".format(bt_atten))
self.attenuators[self.num_atten - 1].set_atten(bt_atten)
for i in range(self.num_atten - 1):
@@ -108,21 +133,12 @@
if not wifi_connection_check(self.pri_ad, self.network["SSID"]):
wifi_test_device_init(self.pri_ad)
wifi_connect(self.pri_ad, self.network, num_of_tries=5)
- self.rvr[bt_atten]["attenuation"] = list(self.attenuation_range)
- if "a2dp_streaming" in self.current_test_name:
- (self.rvr[bt_atten]["throughput_received"],
+ (self.rvr[bt_atten]["throughput_received"],
self.rvr[bt_atten]["a2dp_packet_drop"]) = (
self.rvr_throughput(bt_atten, called_func))
- if all(x <= 0 for x in self.a2dp_dropped_list):
+ if self.a2dp_streaming:
+ if not any(x > 0 for x in self.a2dp_dropped_list):
self.rvr[bt_atten]["a2dp_packet_drop"] = []
- else:
- self.rvr[bt_atten]["throughput_received"] = (
- self.rvr_throughput(bt_atten, called_func))
- self.rvr[bt_atten]["fixed_attenuation"] = (
- self.test_params["fixed_attenuation"][str(
- self.network["channel"])])
- self.rvr["test_name"] = self.current_test_name
- self.rvr["bt_attenuation"] = list(self.bt_attenuation_range)
return True
def rvr_throughput(self, bt_atten, called_func=None):
@@ -138,13 +154,17 @@
self.iperf_received = []
self.iperf_variables.received = []
self.a2dp_dropped_list = []
+ self.rvr["bt_attenuation"].append(bt_atten)
self.rvr[bt_atten]["audio_artifacts"] = {}
- for atten in self.attenuation_range:
+ self.rvr[bt_atten]["attenuation"] = []
+ for atten in self.wifi_atten_range:
+ self.rvr[bt_atten]["attenuation"].append(
+ atten + self.rvr[bt_atten]["fixed_attenuation"])
self.log.info("Setting attenuation = {}".format(atten))
for i in range(self.num_atten - 1):
self.attenuators[i].set_atten(atten)
if not wifi_connection_check(self.pri_ad, self.network["SSID"]):
- return self.iperf_received
+ return self.iperf_received, self.a2dp_dropped_list
time.sleep(5) # Time for attenuation to set.
if called_func:
if not multithread_func(self.log, called_func):
@@ -152,7 +172,7 @@
return self.iperf_received, self.a2dp_dropped_list
else:
self.run_iperf_and_get_result()
- if "a2dp_streaming" in self.current_test_name:
+ if self.a2dp_streaming:
analysis_path = self.audio.audio_quality_analysis(self.log_path)
with open(analysis_path) as f:
self.rvr[bt_atten]["audio_artifacts"][atten] = f.readline()
@@ -161,18 +181,11 @@
self.a2dp_dropped_list.append(
self.a2dp_dumpsys.parse(file_path))
self.teardown_result()
- if self.iperf_variables.received[-1] == "Iperf Failed":
- self.iperf_received.append(0)
- else:
- self.iperf_received.append(
- float(self.iperf_variables.received[-1].strip("Mb/s")))
- self.iperf_variables.received = []
+ self.iperf_received.append(
+ float(str(self.iperf_variables.received[-1]).strip("Mb/s")))
for i in range(self.num_atten - 1):
self.attenuators[i].set_atten(0)
- if "a2dp_streaming" in self.current_test_name:
- return self.iperf_received, self.a2dp_dropped_list
- else:
- return self.iperf_received
+ return self.iperf_received, self.a2dp_dropped_list
def performance_baseline_check(self):
"""Checks for performance_result_path in config. If present, plots
@@ -181,9 +194,6 @@
Returns:
True if success, False otherwise.
"""
- self.flag = False
- if "a2dp_streaming" in self.current_test_name:
- self.flag = True
if self.rvr:
with open(self.json_file, 'a') as results_file:
json.dump(self.rvr, results_file, indent=4)
@@ -199,11 +209,11 @@
respect to its iperf values. Compares rvr results with baseline
values by calculating throughput limits.
"""
- data_sets = {}
+ data_sets = defaultdict(dict)
test_name = self.current_test_name
x_label = 'WIFI Attenuation (dB)'
y_label = []
- legends = {}
+ legends = defaultdict(list)
fig_property = {
"title": test_name,
"x_label": x_label,
@@ -211,14 +221,12 @@
"markersize": 10
}
- for bt_atten in self.bt_attenuation_range:
- data_sets[bt_atten] = {}
- legends[bt_atten] = []
- total_atten = self.total_attenuation(self.rvr[bt_atten])
+ for bt_atten in self.rvr["bt_attenuation"]:
y_label.insert(0, 'Throughput (Mbps)')
legends[bt_atten].insert(
0, str("BT Attenuation @ %sdB" % bt_atten))
- data_sets[bt_atten]["attenuation"] = total_atten
+ data_sets[bt_atten]["attenuation"] = (
+ self.rvr[bt_atten]["attenuation"])
data_sets[bt_atten]["throughput_received"] = (
self.rvr[bt_atten]["throughput_received"])
shaded_region = None
@@ -232,7 +240,7 @@
attenuation_path = attenuation_path[0]
with open(attenuation_path, 'r') as throughput_file:
throughput_results = json.load(throughput_file)
- for bt_atten in self.bt_attenuation_range:
+ for bt_atten in self.bt_atten_range:
throughput_received = []
legends[bt_atten].insert(
0, ('Performance Results @ {}dB'.format(bt_atten)))
@@ -248,8 +256,8 @@
"user_attenuation"] = throughput_attenuation
data_sets[bt_atten]["user_throughput"] = throughput_received
throughput_limits = self.get_throughput_limits(attenuation_path)
- shaded_region = {}
- for bt_atten in self.bt_attenuation_range:
+ shaded_region = defaultdict(dict)
+ for bt_atten in self.bt_atten_range:
shaded_region[bt_atten] = {}
shaded_region[bt_atten] = {
"x_vector": throughput_limits[bt_atten]["attenuation"],
@@ -262,11 +270,12 @@
shaded_region = None
self.log.warning("ValueError: Performance file not found")
- if self.flag:
- for bt_atten in self.bt_attenuation_range:
+ if self.a2dp_streaming:
+ for bt_atten in self.bt_atten_range:
legends[bt_atten].insert(
0, ('Packet drops(in %) @ {}dB'.format(bt_atten)))
- data_sets[bt_atten]["a2dp_attenuation"] = total_atten
+ data_sets[bt_atten]["a2dp_attenuation"] = (
+ self.rvr[bt_atten]["attenuation"])
data_sets[bt_atten]["a2dp_packet_drops"] = (
self.rvr[bt_atten]["a2dp_packet_drop"])
y_label.insert(0, "Packets Dropped")
@@ -274,7 +283,7 @@
output_file_path = os.path.join(self.pri_ad.log_path, test_name,
"attenuation_plot.html")
bokeh_chart_plot(
- list(self.bt_attenuation_range),
+ list(self.rvr["bt_attenuation"]),
data_sets,
legends,
fig_property,
@@ -290,11 +299,12 @@
Returns:
Total attenuation is returned.
"""
- total_atten = [
- att + performance_dict["fixed_attenuation"]
- for att in performance_dict["attenuation"]
- ]
- return total_atten
+ if "fixed_attenuation" in self.test_params:
+ total_atten = [
+ att + performance_dict["fixed_attenuation"]
+ for att in performance_dict["attenuation"]
+ ]
+ return total_atten
def throughput_pass_fail_check(self):
"""Check the test result and decide if it passed or failed
@@ -314,15 +324,15 @@
throughput_limits = self.get_throughput_limits(performance_path)
failure_count = 0
- for bt_atten in self.bt_attenuation_range:
+ for bt_atten in self.bt_atten_range:
for idx, current_throughput in enumerate(
self.rvr[bt_atten]["throughput_received"]):
current_att = self.rvr[bt_atten]["attenuation"][idx] + (
self.rvr[bt_atten]["fixed_attenuation"])
if (current_throughput <
- (throughput_limits[bt_atten]["lower_limit"][idx]) or
+ (throughput_limits[bt_atten]["lower_limit"][idx]) or
current_throughput >
- (throughput_limits[bt_atten]["upper_limit"][idx])):
+ (throughput_limits[bt_atten]["upper_limit"][idx])):
failure_count = failure_count + 1
self.log.info(
"Throughput at {} dB attenuation is beyond limits. "
@@ -341,7 +351,8 @@
"Test passed. Found {} points outside throughput limits.".
format(failure_count))
except Exception as e:
- self.log.warning("ValueError: Performance file not found")
+ self.log.warning("ValueError: Performance file not found cannot "
+ "calculate throughput limits")
def get_throughput_limits(self, performance_path):
"""Compute throughput limits for current test.
@@ -359,9 +370,8 @@
"""
with open(performance_path, 'r') as performance_file:
performance_results = json.load(performance_file)
- throughput_limits = {}
- for bt_atten in self.bt_attenuation_range:
- throughput_limits[bt_atten] = {}
+ throughput_limits = defaultdict(dict)
+ for bt_atten in self.bt_atten_range:
performance_attenuation = (self.total_attenuation(
performance_results[str(bt_atten)]))
attenuation = []