blob: 31806605abf8e20a0302f2de52f730a7ce5b5d06 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright (C) 2018 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import json
import os
import time
from collections import defaultdict
from acts.metrics.loggers.blackbox import BlackboxMetricLogger
from acts_contrib.test_utils.bt.bt_test_utils import disable_bluetooth
from acts_contrib.test_utils.coex.audio_test_utils import AudioCaptureResult
from acts_contrib.test_utils.coex.audio_test_utils import get_audio_capture_device
from acts_contrib.test_utils.coex.CoexBaseTest import CoexBaseTest
from acts_contrib.test_utils.coex.coex_test_utils import bokeh_chart_plot
from acts_contrib.test_utils.coex.coex_test_utils import collect_bluetooth_manager_dumpsys_logs
from acts_contrib.test_utils.coex.coex_test_utils import multithread_func
from acts_contrib.test_utils.coex.coex_test_utils import wifi_connection_check
from acts_contrib.test_utils.wifi.wifi_test_utils import wifi_connect
from acts_contrib.test_utils.wifi.wifi_test_utils import wifi_test_device_init
from acts.utils import get_current_epoch_time
RSSI_POLL_RESULTS = 'Monitoring , Handle: 0x0003, POLL'
RSSI_RESULTS = 'Monitoring , Handle: 0x0003, '
def get_atten_range(start, stop, step):
"""Function to derive attenuation range for tests.
Args:
start: Start attenuation value.
stop: Stop attenuation value.
step: Step attenuation value.
"""
temp = start
while temp < stop:
yield temp
temp += step
class CoexPerformanceBaseTest(CoexBaseTest):
"""Base test class for performance tests.
Attributes:
rvr : Dict to save attenuation, throughput, fixed_attenuation.
a2dp_streaming : Used to denote a2dp test cases.
"""
def __init__(self, controllers):
super().__init__(controllers)
self.a2dp_streaming = False
self.rvr = {}
self.bt_range_metric = BlackboxMetricLogger.for_test_case(
metric_name='bt_range')
self.wifi_max_atten_metric = BlackboxMetricLogger.for_test_case(
metric_name='wifi_max_atten')
self.wifi_min_atten_metric = BlackboxMetricLogger.for_test_case(
metric_name='wifi_min_atten')
self.wifi_range_metric = BlackboxMetricLogger.for_test_case(
metric_name='wifi_range_metric')
def setup_class(self):
req_params = ['test_params', 'Attenuator']
opt_params = ['audio_params']
self.unpack_userparams(req_params, opt_params)
if hasattr(self, 'Attenuator'):
self.num_atten = self.attenuators[0].instrument.num_atten
else:
self.log.error('Attenuator should be connected to run tests.')
return False
for i in range(self.num_atten):
self.attenuators[i].set_atten(0)
super().setup_class()
self.performance_files_list = []
if "performance_result_path" in self.user_params["test_params"]:
self.performance_files_list = [
os.path.join(self.test_params["performance_result_path"],
files) for files in os.listdir(
self.test_params["performance_result_path"])
]
self.bt_atten_range = list(get_atten_range(
self.test_params["bt_atten_start"],
self.test_params["bt_atten_stop"],
self.test_params["bt_atten_step"]))
self.wifi_atten_range = list(get_atten_range(
self.test_params["attenuation_start"],
self.test_params["attenuation_stop"],
self.test_params["attenuation_step"]))
def setup_test(self):
if ('a2dp_streaming' in self.current_test_name and
hasattr(self, 'audio_params')):
self.audio = get_audio_capture_device(self.sec_ad, self.audio_params)
self.a2dp_streaming = True
for i in range(self.num_atten):
self.attenuators[i].set_atten(0)
if not wifi_connection_check(self.pri_ad, self.network["SSID"]):
wifi_connect(self.pri_ad, self.network, num_of_tries=5)
super().setup_test()
def teardown_test(self):
self.performance_baseline_check()
for i in range(self.num_atten):
self.attenuators[i].set_atten(0)
current_atten = int(self.attenuators[i].get_atten())
self.log.debug(
"Setting attenuation to zero : Current atten {} : {}".format(
self.attenuators[i], current_atten))
self.a2dp_streaming = False
if not disable_bluetooth(self.pri_ad.droid):
self.log.info("Failed to disable bluetooth")
return False
self.destroy_android_and_relay_object()
self.rvr = {}
def teardown_class(self):
self.reset_wifi_and_store_results()
def set_attenuation_and_run_iperf(self, called_func=None):
"""Sets attenuation and runs iperf for Attenuation max value.
Args:
called_func : Function object to run.
Returns:
True if Pass
False if Fail
"""
self.attenuators[self.num_atten - 1].set_atten(0)
self.rvr["bt_attenuation"] = []
self.rvr["test_name"] = self.current_test_name
self.rvr["bt_gap_analysis"] = {}
self.rvr["bt_range"] = []
status_flag = True
for bt_atten in self.bt_atten_range:
self.rvr[bt_atten] = {}
self.rvr[bt_atten]["fixed_attenuation"] = (
self.test_params["fixed_attenuation"][str(
self.network["channel"])])
self.log.info('Setting bt attenuation to: {} dB'.format(bt_atten))
self.attenuators[self.num_atten - 1].set_atten(bt_atten)
for i in range(self.num_atten - 1):
self.attenuators[i].set_atten(0)
if not wifi_connection_check(self.pri_ad, self.network["SSID"]):
wifi_test_device_init(self.pri_ad)
wifi_connect(self.pri_ad, self.network, num_of_tries=5)
adb_rssi_results = self.pri_ad.search_logcat(RSSI_RESULTS)
if adb_rssi_results:
self.log.debug(adb_rssi_results[-1])
self.log.info('Android device: {}'.format(
(adb_rssi_results[-1]['log_message']).split(',')[5]))
(self.rvr[bt_atten]["throughput_received"],
self.rvr[bt_atten]["a2dp_packet_drop"],
status_flag) = self.rvr_throughput(bt_atten, called_func)
self.wifi_max_atten_metric.metric_value = max(self.rvr[bt_atten]
["attenuation"])
self.wifi_min_atten_metric.metric_value = min(self.rvr[bt_atten]
["attenuation"])
if self.rvr[bt_atten]["throughput_received"]:
for i, atten in enumerate(self.rvr[bt_atten]["attenuation"]):
if self.rvr[bt_atten]["throughput_received"][i] < 1.0:
self.wifi_range_metric.metric_value = (
self.rvr[bt_atten]["attenuation"][i-1])
break
else:
self.wifi_range_metric.metric_value = max(
self.rvr[bt_atten]["attenuation"])
else:
self.wifi_range_metric.metric_value = max(
self.rvr[bt_atten]["attenuation"])
if self.a2dp_streaming:
if not any(x > 0 for x in self.a2dp_dropped_list):
self.rvr[bt_atten]["a2dp_packet_drop"] = []
if not self.rvr["bt_range"]:
self.rvr["bt_range"].append(0)
return status_flag
def rvr_throughput(self, bt_atten, called_func=None):
"""Sets attenuation and runs the function passed.
Args:
bt_atten: Bluetooth attenuation.
called_func: Functions object to run parallely.
Returns:
Throughput, a2dp_drops and True/False.
"""
self.iperf_received = []
self.iperf_variables.received = []
self.a2dp_dropped_list = []
self.rvr["bt_attenuation"].append(bt_atten)
self.rvr[bt_atten]["audio_artifacts"] = {}
self.rvr[bt_atten]["attenuation"] = []
self.rvr["bt_gap_analysis"][bt_atten] = {}
for atten in self.wifi_atten_range:
tag = '{}_{}'.format(bt_atten, atten)
self.rvr[bt_atten]["attenuation"].append(
atten + self.rvr[bt_atten]["fixed_attenuation"])
self.log.info('Setting wifi attenuation to: {} dB'.format(atten))
for i in range(self.num_atten - 1):
self.attenuators[i].set_atten(atten)
if not wifi_connection_check(self.pri_ad, self.network["SSID"]):
self.iperf_received.append(0)
return self.iperf_received, self.a2dp_dropped_list, False
time.sleep(5) # Time for attenuation to set.
begin_time = get_current_epoch_time()
if self.a2dp_streaming:
self.audio.start()
if called_func:
if not multithread_func(self.log, called_func):
self.iperf_received.append(float(str(
self.iperf_variables.received[-1]).strip("Mb/s")))
return self.iperf_received, self.a2dp_dropped_list, False
else:
self.run_iperf_and_get_result()
adb_rssi_poll_results = self.pri_ad.search_logcat(
RSSI_POLL_RESULTS, begin_time)
adb_rssi_results = self.pri_ad.search_logcat(
RSSI_RESULTS, begin_time)
if adb_rssi_results:
self.log.debug(adb_rssi_poll_results)
self.log.debug(adb_rssi_results[-1])
self.log.info('Android device: {}'.format((
adb_rssi_results[-1]['log_message']).split(',')[5]))
if self.a2dp_streaming:
self.path = self.audio.stop()
analysis_path = AudioCaptureResult(
self.path).audio_quality_analysis(self.audio_params)
with open(analysis_path) as f:
self.rvr[bt_atten]["audio_artifacts"][atten] = f.readline()
content = json.loads(self.rvr[bt_atten]["audio_artifacts"][atten])
self.rvr["bt_gap_analysis"][bt_atten][atten] = {}
for idx, data in enumerate(content["quality_result"]):
if data['artifacts']['delay_during_playback']:
self.rvr["bt_gap_analysis"][bt_atten][atten][idx] = (
data['artifacts']['delay_during_playback'])
self.rvr["bt_range"].append(bt_atten)
else:
self.rvr["bt_gap_analysis"][bt_atten][atten][idx] = 0
file_path = collect_bluetooth_manager_dumpsys_logs(
self.pri_ad, self.current_test_name)
self.a2dp_dropped_list.append(
self.a2dp_dumpsys.parse(file_path))
self.iperf_received.append(
float(str(self.iperf_variables.throughput[-1]).strip("Mb/s")))
for i in range(self.num_atten - 1):
self.attenuators[i].set_atten(0)
return self.iperf_received, self.a2dp_dropped_list, True
def performance_baseline_check(self):
"""Checks for performance_result_path in config. If present, plots
comparision chart else plot chart for that particular test run.
Returns:
True if success, False otherwise.
"""
if self.rvr:
with open(self.json_file, 'a') as results_file:
json.dump({str(k): v for k, v in self.rvr.items()},
results_file, indent=4, sort_keys=True)
self.bt_range_metric.metric_value = self.rvr["bt_range"][0]
self.log.info('First occurrence of audio gap in bt '
'range: {}'.format(self.bt_range_metric.metric_value))
self.log.info('Bluetooth min range: '
'{} dB'.format(min(self.rvr['bt_attenuation'])))
self.log.info('Bluetooth max range: '
'{} dB'.format(max(self.rvr['bt_attenuation'])))
self.plot_graph_for_attenuation()
if not self.performance_files_list:
self.log.warning('Performance file list is empty. Could not '
'calculate throughput limits')
return
self.throughput_pass_fail_check()
else:
self.log.error("Throughput dict empty!")
return False
return True
def plot_graph_for_attenuation(self):
"""Plots graph and add as JSON formatted results for attenuation with
respect to its iperf values.
"""
data_sets = defaultdict(dict)
legends = defaultdict(list)
x_label = 'WIFI Attenuation (dB)'
y_label = []
fig_property = {
"title": self.current_test_name,
"x_label": x_label,
"linewidth": 3,
"markersize": 10
}
for bt_atten in self.rvr["bt_attenuation"]:
y_label.insert(0, 'Throughput (Mbps)')
legends[bt_atten].insert(
0, str("BT Attenuation @ %sdB" % bt_atten))
data_sets[bt_atten]["attenuation"] = (
self.rvr[bt_atten]["attenuation"])
data_sets[bt_atten]["throughput_received"] = (
self.rvr[bt_atten]["throughput_received"])
if self.a2dp_streaming:
for bt_atten in self.bt_atten_range:
legends[bt_atten].insert(
0, ('Packet drops(in %) @ {}dB'.format(bt_atten)))
data_sets[bt_atten]["a2dp_attenuation"] = (
self.rvr[bt_atten]["attenuation"])
data_sets[bt_atten]["a2dp_packet_drops"] = (
self.rvr[bt_atten]["a2dp_packet_drop"])
y_label.insert(0, "Packets Dropped")
fig_property["y_label"] = y_label
shaded_region = None
if "performance_result_path" in self.user_params["test_params"]:
shaded_region = self.comparision_results_calculation(data_sets, legends)
output_file_path = os.path.join(self.pri_ad.log_path,
self.current_test_name,
"attenuation_plot.html")
bokeh_chart_plot(list(self.rvr["bt_attenuation"]),
data_sets,
legends,
fig_property,
shaded_region=shaded_region,
output_file_path=output_file_path)
def comparision_results_calculation(self, data_sets, legends):
"""Compares rvr results with baseline values by calculating throughput
limits.
Args:
data_sets: including lists of x_data and lists of y_data.
ex: [[[x_data1], [x_data2]], [[y_data1],[y_data2]]]
legends: list of legend for each curve.
Returns:
None if test_file is not found, otherwise shaded_region
will be returned.
"""
try:
attenuation_path = next(
file_name for file_name in self.performance_files_list
if self.current_test_name in file_name
)
except StopIteration:
self.log.warning("Test_file not found. "
"No comparision values to calculate")
return
with open(attenuation_path, 'r') as throughput_file:
throughput_results = json.load(throughput_file)
for bt_atten in self.bt_atten_range:
throughput_received = []
user_attenuation = []
legends[bt_atten].insert(
0, ('Performance Results @ {}dB'.format(bt_atten)))
for att in self.rvr[bt_atten]["attenuation"]:
attenuation = att - self.rvr[bt_atten]["fixed_attenuation"]
throughput_received.append(throughput_results[str(bt_atten)]
["throughput_received"][attenuation])
user_attenuation.append(att)
data_sets[bt_atten][
"user_attenuation"] = user_attenuation
data_sets[bt_atten]["user_throughput"] = throughput_received
throughput_limits = self.get_throughput_limits(attenuation_path)
shaded_region = defaultdict(dict)
for bt_atten in self.bt_atten_range:
shaded_region[bt_atten] = {
"x_vector": throughput_limits[bt_atten]["attenuation"],
"lower_limit":
throughput_limits[bt_atten]["lower_limit"],
"upper_limit":
throughput_limits[bt_atten]["upper_limit"]
}
return shaded_region
def total_attenuation(self, performance_dict):
"""Calculates attenuation with adding fixed attenuation.
Args:
performance_dict: dict containing attenuation and fixed attenuation.
Returns:
Total attenuation is returned.
"""
if "fixed_attenuation" in self.test_params:
total_atten = [
att + performance_dict["fixed_attenuation"]
for att in performance_dict["attenuation"]
]
return total_atten
def throughput_pass_fail_check(self):
"""Check the test result and decide if it passed or failed
by comparing with throughput limits.The pass/fail tolerances are
provided in the config file.
Returns:
None if test_file is not found, True if successful,
False otherwise.
"""
try:
performance_path = next(
file_name for file_name in self.performance_files_list
if self.current_test_name in file_name
)
except StopIteration:
self.log.warning("Test_file not found. Couldn't "
"calculate throughput limits")
return
throughput_limits = self.get_throughput_limits(performance_path)
failure_count = 0
for bt_atten in self.bt_atten_range:
for idx, current_throughput in enumerate(
self.rvr[bt_atten]["throughput_received"]):
current_att = self.rvr[bt_atten]["attenuation"][idx]
if (current_throughput <
(throughput_limits[bt_atten]["lower_limit"][idx]) or
current_throughput >
(throughput_limits[bt_atten]["upper_limit"][idx])):
failure_count = failure_count + 1
self.log.info(
"Throughput at {} dB attenuation is beyond limits. "
"Throughput is {} Mbps. Expected within [{}, {}] Mbps.".
format(
current_att, current_throughput,
throughput_limits[bt_atten]["lower_limit"][idx],
throughput_limits[bt_atten]["upper_limit"][
idx]))
if failure_count >= self.test_params["failure_count_tolerance"]:
self.log.error(
"Test failed. Found {} points outside throughput limits.".
format(failure_count))
return False
self.log.info(
"Test passed. Found {} points outside throughput limits.".
format(failure_count))
return True
def get_throughput_limits(self, performance_path):
"""Compute throughput limits for current test.
Checks the RvR test result and compares to a throughput limits for
the same configuration. The pass/fail tolerances are provided in the
config file.
Args:
performance_path: path to baseline file used to generate limits
Returns:
throughput_limits: dict containing attenuation and throughput
limit data
"""
with open(performance_path, 'r') as performance_file:
performance_results = json.load(performance_file)
throughput_limits = defaultdict(dict)
for bt_atten in self.bt_atten_range:
performance_attenuation = (self.total_attenuation(
performance_results[str(bt_atten)]))
attenuation = []
lower_limit = []
upper_limit = []
for idx, current_throughput in enumerate(
self.rvr[bt_atten]["throughput_received"]):
current_att = self.rvr[bt_atten]["attenuation"][idx]
att_distances = [
abs(current_att - performance_att)
for performance_att in performance_attenuation
]
sorted_distances = sorted(
enumerate(att_distances), key=lambda x: x[1])
closest_indeces = [dist[0] for dist in sorted_distances[0:3]]
closest_throughputs = [
performance_results[str(bt_atten)]["throughput_received"][
index] for index in closest_indeces
]
closest_throughputs.sort()
attenuation.append(current_att)
lower_limit.append(
max(closest_throughputs[0] -
max(self.test_params["abs_tolerance"],
closest_throughputs[0] *
self.test_params["pct_tolerance"] / 100), 0))
upper_limit.append(closest_throughputs[-1] + max(
self.test_params["abs_tolerance"], closest_throughputs[-1] *
self.test_params["pct_tolerance"] / 100))
throughput_limits[bt_atten]["attenuation"] = attenuation
throughput_limits[bt_atten]["lower_limit"] = lower_limit
throughput_limits[bt_atten]["upper_limit"] = upper_limit
return throughput_limits