Snap for 4829593 from eeb293bf3cbaf297a94378857bee8eb885cb719e to pi-release
Change-Id: I388c726e89725e918c625abbaa4a90bbf712c00f
diff --git a/acts/framework/acts/test_utils/wifi/wifi_power_test_utils.py b/acts/framework/acts/test_utils/wifi/wifi_power_test_utils.py
index 421c7ce..b7b9fe8 100644
--- a/acts/framework/acts/test_utils/wifi/wifi_power_test_utils.py
+++ b/acts/framework/acts/test_utils/wifi/wifi_power_test_utils.py
@@ -313,9 +313,9 @@
index_now = legends.index(legend)
color = colors[index_now % len(colors)]
plot.line(
- x_data, y_data, legend=str(legend), line_width=3, color=color)
+ x_data, y_data, legend=str(legend), line_width=fig_property['linewidth'], color=color)
plot.circle(
- x_data, y_data, size=10, legend=str(legend), fill_color=color)
+ x_data, y_data, size=fig_property['markersize'], legend=str(legend), fill_color=color)
#Plot properties
plot.xaxis.axis_label = fig_property['x_label']
diff --git a/acts/tests/google/ble/bt5/AdvertisingSetTest.py b/acts/tests/google/ble/bt5/AdvertisingSetTest.py
index 4427bb5..0f0c352 100644
--- a/acts/tests/google/ble/bt5/AdvertisingSetTest.py
+++ b/acts/tests/google/ble/bt5/AdvertisingSetTest.py
@@ -43,7 +43,6 @@
class AdvertisingSetTest(BluetoothBaseTest):
default_timeout = 10
- max_scan_instances = 28
report_delay = 2000
scan_callbacks = []
adv_callbacks = []
diff --git a/acts/tests/google/ble/bt5/Bt5ScanTest.py b/acts/tests/google/ble/bt5/Bt5ScanTest.py
index 7f7b7dc..633489f 100644
--- a/acts/tests/google/ble/bt5/Bt5ScanTest.py
+++ b/acts/tests/google/ble/bt5/Bt5ScanTest.py
@@ -41,7 +41,6 @@
class Bt5ScanTest(BluetoothBaseTest):
default_timeout = 10
- max_scan_instances = 28
report_delay = 2000
scan_callbacks = []
adv_callbacks = []
diff --git a/acts/tests/google/ble/concurrency/ConcurrentBleScanningTest.py b/acts/tests/google/ble/concurrency/ConcurrentBleScanningTest.py
index 24e078a..1972806 100644
--- a/acts/tests/google/ble/concurrency/ConcurrentBleScanningTest.py
+++ b/acts/tests/google/ble/concurrency/ConcurrentBleScanningTest.py
@@ -37,7 +37,7 @@
class ConcurrentBleScanningTest(BluetoothBaseTest):
default_timeout = 20
- max_concurrent_scans = 28
+ max_concurrent_scans = 27
def __init__(self, controllers):
BluetoothBaseTest.__init__(self, controllers)
diff --git a/acts/tests/google/ble/scan/BleBackgroundScanTest.py b/acts/tests/google/ble/scan/BleBackgroundScanTest.py
index cf6afc1..55fb86b 100644
--- a/acts/tests/google/ble/scan/BleBackgroundScanTest.py
+++ b/acts/tests/google/ble/scan/BleBackgroundScanTest.py
@@ -39,7 +39,6 @@
class BleBackgroundScanTest(BluetoothBaseTest):
default_timeout = 10
- max_scan_instances = 28
report_delay = 2000
scan_callbacks = []
adv_callbacks = []
diff --git a/acts/tests/google/ble/scan/BleOnLostOnFoundTest.py b/acts/tests/google/ble/scan/BleOnLostOnFoundTest.py
index 0896c6d..cdca99c 100644
--- a/acts/tests/google/ble/scan/BleOnLostOnFoundTest.py
+++ b/acts/tests/google/ble/scan/BleOnLostOnFoundTest.py
@@ -36,7 +36,6 @@
class BleOnLostOnFoundTest(BluetoothBaseTest):
default_timeout = 10
- max_scan_instances = 28
active_scan_callback_list = []
active_adv_callback_list = []
diff --git a/acts/tests/google/ble/scan/BleOpportunisticScanTest.py b/acts/tests/google/ble/scan/BleOpportunisticScanTest.py
index d407723..e907cbb 100644
--- a/acts/tests/google/ble/scan/BleOpportunisticScanTest.py
+++ b/acts/tests/google/ble/scan/BleOpportunisticScanTest.py
@@ -38,7 +38,7 @@
class BleOpportunisticScanTest(BluetoothBaseTest):
default_timeout = 10
- max_scan_instances = 28
+ max_scan_instances = 27
report_delay = 2000
scan_callbacks = []
adv_callbacks = []
diff --git a/acts/tests/google/ble/scan/BleScanScreenStateTest.py b/acts/tests/google/ble/scan/BleScanScreenStateTest.py
index 33cc758..2593461 100644
--- a/acts/tests/google/ble/scan/BleScanScreenStateTest.py
+++ b/acts/tests/google/ble/scan/BleScanScreenStateTest.py
@@ -39,7 +39,7 @@
class BleScanScreenStateTest(BluetoothBaseTest):
advertise_callback = -1
- max_concurrent_scans = 28
+ max_concurrent_scans = 27
scan_callback = -1
shorter_scan_timeout = 2
diff --git a/acts/tests/google/bt/RfcommTest.py b/acts/tests/google/bt/RfcommTest.py
index 846827b..3160469 100644
--- a/acts/tests/google/bt/RfcommTest.py
+++ b/acts/tests/google/bt/RfcommTest.py
@@ -836,28 +836,3 @@
"""
return self._test_rfcomm_connection_with_uuid(
bt_rfcomm_uuids['mcap_data_channel'])
-
- @BluetoothBaseTest.bt_test_wrap
- @test_tracker_info(uuid='d6c7523d-9247-480e-8154-edd51ae1be50')
- def test_rfcomm_connection_l2cap_uuid(self):
- """Test Bluetooth RFCOMM connection using L2CAP uuid
-
- Test RFCOMM though establishing a basic connection.
-
- Steps:
- 1. Get the mac address of the server device.
- 2. Establish an RFCOMM connection from the client to the server AD.
- 3. Verify that the RFCOMM connection is active from both the client and
- server.
-
- Expected Result:
- RFCOMM connection is established then disconnected succcessfully.
-
- Returns:
- Pass if True
- Fail if False
-
- TAGS: Classic, RFCOMM
- Priority: 3
- """
- return self._test_rfcomm_connection_with_uuid(bt_rfcomm_uuids['l2cap'])
diff --git a/acts/tests/google/wifi/WifiRssiTest.py b/acts/tests/google/wifi/WifiRssiTest.py
index cbbd54c..4247344 100644
--- a/acts/tests/google/wifi/WifiRssiTest.py
+++ b/acts/tests/google/wifi/WifiRssiTest.py
@@ -62,7 +62,7 @@
def teardown_test(self):
self.iperf_server.stop()
- def pass_fail_check_rssi_stability(self, rssi_result):
+ def pass_fail_check_rssi_stability(self, postprocessed_results):
"""Check the test result and decide if it passed or failed.
Checks the RSSI test result and fails the test if the standard
@@ -70,86 +70,31 @@
config file.
Args:
- rssi_result: dict containing attenuation, rssi, and other meta
- data. This dict is the output of self.post_process_results
+ postprocessed_results: compiled arrays of RSSI measurements
"""
- # Save output as text file
- test_name = self.current_test_name
- results_file_path = "{}/{}.json".format(self.log_path,
- self.current_test_name)
- with open(results_file_path, 'w') as results_file:
- json.dump(rssi_result, results_file, indent=4)
-
- x_data = []
- y_data = []
- legends = []
- std_deviations = {
- "signal_poll_rssi": [],
- "signal_poll_avg_rssi": [],
- "chain_0_rssi": [],
- "chain_1_rssi": [],
- }
- for data_point in rssi_result["rssi_result"]:
- for key, val in data_point["connected_rssi"].items():
- if type(val) == list:
- x_data.append([
- x * self.test_params["polling_frequency"]
- for x in range(len(val))
- ])
- y_data.append(val)
- legends.append(key)
- std_deviations[key].append(
- data_point["connected_rssi"]["stdev_" + key])
- data_sets = [x_data, y_data]
- x_label = 'Time (s)'
- y_label = 'RSSI (dBm)'
- fig_property = {
- "title": test_name,
- "x_label": x_label,
- "y_label": y_label,
- "linewidth": 3,
- "markersize": 0
- }
- output_file_path = "{}/{}.html".format(self.log_path, test_name)
- wputils.bokeh_plot(
- data_sets,
- legends,
- fig_property,
- shaded_region=None,
- output_file_path=output_file_path)
-
test_failed = any([
stdev > self.test_params["stdev_tolerance"]
- for stdev in std_deviations["signal_poll_rssi"]
+ for stdev in postprocessed_results["signal_poll_rssi"]["stdev"]
])
- if test_failed:
- asserts.fail(
- "RSSI stability failed. Standard deviations were {0} dB "
- "(limit {1}), per chain standard deviation [{2}, {3}] dB".
- format([
+ test_message = (
+ "RSSI stability {0}. Standard deviation was {1} dB "
+ "(limit {2}), per chain standard deviation [{3}, {4}] dB".format(
+ "failed" * test_failed + "passed" * (not test_failed), [
float("{:.2f}".format(x))
- for x in std_deviations["signal_poll_rssi"]
+ for x in postprocessed_results["signal_poll_rssi"]["stdev"]
], self.test_params["stdev_tolerance"], [
float("{:.2f}".format(x))
- for x in std_deviations["chain_0_rssi"]
+ for x in postprocessed_results["chain_0_rssi"]["stdev"]
], [
float("{:.2f}".format(x))
- for x in std_deviations["chain_1_rssi"]
+ for x in postprocessed_results["chain_1_rssi"]["stdev"]
]))
- asserts.explicit_pass(
- "RSSI stability passed. Standard deviations were {0} dB "
- "(limit {1}), per chain standard deviation [{2}, {3}] dB".format([
- float("{:.2f}".format(x))
- for x in std_deviations["signal_poll_rssi"]
- ], self.test_params["stdev_tolerance"], [
- float("{:.2f}".format(x))
- for x in std_deviations["chain_0_rssi"]
- ], [
- float("{:.2f}".format(x))
- for x in std_deviations["chain_1_rssi"]
- ]))
+ if test_failed:
+ asserts.fail(test_message)
+ asserts.explicit_pass(test_message)
- def pass_fail_check_rssi_vs_attenuation(self, postprocessed_results):
+ def pass_fail_check_rssi_accuracy(self, postprocessed_results,
+ rssi_under_test, absolute_accuracy):
"""Check the test result and decide if it passed or failed.
Checks the RSSI test result and compares and compute its deviation from
@@ -159,154 +104,246 @@
configuration file.
Args:
- result: dict containing attenuation, rssi, and other meta
- data. This dict is the output of self.post_process_results
+ postprocessed_results: compiled arrays of RSSI measurements
+ rssi_under_test: list of RSSIs under test, i.e., can cause test to
+ fail
+ absolute_accuracy: boolean indicating whether to look at absolute
+ RSSI accuracy, or centered RSSI accuracy. Centered accuracy is
+ computed after systematic RSSI shifts are removed.
"""
-
- error_data = {
- "signal_poll_rssi": [
- postprocessed_results["mean_signal_poll_rssi"][idx] -
- postprocessed_results["predicted_rssi"][idx]
- for idx in range(len(postprocessed_results["predicted_rssi"]))
- ],
- "signal_poll_avg_rssi": [
- postprocessed_results["mean_signal_poll_avg_rssi"][idx] -
- postprocessed_results["predicted_rssi"][idx]
- for idx in range(len(postprocessed_results["predicted_rssi"]))
- ],
- "scan_rssi": [
- postprocessed_results["mean_scan_rssi"][idx] -
- postprocessed_results["predicted_rssi"][idx]
- for idx in range(len(postprocessed_results["predicted_rssi"]))
- ],
- "chain_0_rssi": [
- postprocessed_results["mean_chain_0_rssi"][idx] + CONST_3dB -
- postprocessed_results["predicted_rssi"][idx]
- for idx in range(len(postprocessed_results["predicted_rssi"]))
- ],
- "chain_1_rssi": [
- postprocessed_results["mean_chain_1_rssi"][idx] + CONST_3dB -
- postprocessed_results["predicted_rssi"][idx]
- for idx in range(len(postprocessed_results["predicted_rssi"]))
- ]
- }
-
test_failed = False
test_message = ""
- for key, val in error_data.items():
+ if absolute_accuracy:
+ error_type = "absolute"
+ else:
+ error_type = "centered"
+
+ for key, val in postprocessed_results.items():
# Compute the error metrics ignoring invalid RSSI readings
# If all readings invalid, set error to RSSI_ERROR_VAL
- filtered_errors = [x for x in val if not math.isnan(x)]
- if filtered_errors:
- avg_error = sum([abs(x) for x in filtered_errors
- ]) / len(filtered_errors)
- avg_shift = sum(filtered_errors) / len(filtered_errors)
- else:
- avg_error = RSSI_ERROR_VAL
- rssi_failure = (avg_error > self.test_params["abs_tolerance"]
- ) or math.isnan(avg_error)
- if rssi_failure and key in self.test_params["rssi_under_test"]:
- test_message = test_message + (
- "{} failed. Average error is {:.2f} dB. "
- "Average shift is {:.2f} dB.\n").format(
- key, avg_error, avg_shift)
- test_failed = True
- elif rssi_failure:
- test_message = test_message + (
- "{} failed (ignored). Average error is {:.2f} dB. "
- "Average shift is {:.2f} dB.\n").format(
- key, avg_error, avg_shift)
- else:
- test_message = test_message + (
- "{} passed. Average error is {:.2f} dB. "
- "Average shift is {:.2f} dB.\n").format(
- key, avg_error, avg_shift)
+ if "rssi" in key and "predicted" not in key:
+ filtered_error = [x for x in val["error"] if not math.isnan(x)]
+ if filtered_error:
+ avg_shift = statistics.mean(filtered_error)
+ if absolute_accuracy:
+ avg_error = statistics.mean(
+ [abs(x) for x in filtered_error])
+ else:
+ avg_error = statistics.mean(
+ [abs(x - avg_shift) for x in filtered_error])
+ else:
+ avg_error = RSSI_ERROR_VAL
+ avg_shift = RSSI_ERROR_VAL
+ rssi_failure = (avg_error > self.test_params["abs_tolerance"]
+ ) or math.isnan(avg_error)
+ if rssi_failure and key in rssi_under_test:
+ test_message = test_message + (
+ "{} failed. Average {} error is {:.2f} dB. "
+ "Average shift is {:.2f} dB.\n").format(
+ key, error_type, avg_error, avg_shift)
+ test_failed = True
+ elif rssi_failure:
+ test_message = test_message + (
+ "{} failed (ignored). Average {} error is {:.2f} dB. "
+ "Average shift is {:.2f} dB.\n").format(
+ key, error_type, avg_error, avg_shift)
+ else:
+ test_message = test_message + (
+ "{} passed. Average {} error is {:.2f} dB. "
+ "Average shift is {:.2f} dB.\n").format(
+ key, error_type, avg_error, avg_shift)
if test_failed:
asserts.fail(test_message)
asserts.explicit_pass(test_message)
- def post_process_rssi_vs_attenuation(self, rssi_result):
- """Saves plots and JSON formatted results.
+ def post_process_rssi_sweep(self, rssi_result):
+ """Postprocesses and saves JSON formatted results.
Args:
rssi_result: dict containing attenuation, rssi and other meta
data
Returns:
- postprocessed_results: compiled arrays of RSSI measurements used in
+ postprocessed_results: compiled arrays of RSSI data used in
pass/fail check
"""
# Save output as text file
- test_name = self.current_test_name
results_file_path = "{}/{}.json".format(self.log_path,
self.current_test_name)
with open(results_file_path, 'w') as results_file:
json.dump(rssi_result, results_file, indent=4)
- # Plot and save
- total_attenuation = [
- att + rssi_result["fixed_attenuation"] +
- rssi_result["dut_front_end_loss"]
- for att in rssi_result["attenuation"]
- ]
# Compile results into arrays of RSSIs suitable for plotting
postprocessed_results = {
- "total_attenuation":
- total_attenuation,
- "mean_signal_poll_rssi": [
- x["connected_rssi"]["mean_signal_poll_rssi"]
- for x in rssi_result["rssi_result"]
- ],
- "mean_signal_poll_avg_rssi": [
- x["connected_rssi"]["mean_signal_poll_avg_rssi"]
- for x in rssi_result["rssi_result"]
- ],
- "mean_scan_rssi": [
- x["scan_rssi"][rssi_result["connected_bssid"]]["avg_rssi"]
- for x in rssi_result["rssi_result"]
- ],
- "mean_chain_0_rssi": [
- x["connected_rssi"]["mean_chain_0_rssi"]
- for x in rssi_result["rssi_result"]
- ],
- "mean_chain_1_rssi": [
- x["connected_rssi"]["mean_chain_1_rssi"]
- for x in rssi_result["rssi_result"]
- ],
- "predicted_rssi":
- [rssi_result["ap_tx_power"] - att for att in total_attenuation]
+ "signal_poll_rssi": {},
+ "signal_poll_avg_rssi": {},
+ "scan_rssi": {},
+ "chain_0_rssi": {},
+ "chain_1_rssi": {},
+ "total_attenuation": [],
+ "predicted_rssi": []
}
+ for key, val in postprocessed_results.items():
+ if "scan_rssi" in key:
+ postprocessed_results[key]["data"] = [
+ x for data_point in rssi_result["rssi_result"] for x in
+ data_point[key][rssi_result["connected_bssid"]]["data"]
+ ]
+ postprocessed_results[key]["mean"] = [
+ x[key][rssi_result["connected_bssid"]]["mean"]
+ for x in rssi_result["rssi_result"]
+ ]
+ postprocessed_results[key]["stdev"] = [
+ x[key][rssi_result["connected_bssid"]]["stdev"]
+ for x in rssi_result["rssi_result"]
+ ]
+ elif "predicted_rssi" in key:
+ postprocessed_results["total_attenuation"] = [
+ att + rssi_result["fixed_attenuation"] +
+ rssi_result["dut_front_end_loss"]
+ for att in rssi_result["attenuation"]
+ ]
+ postprocessed_results["predicted_rssi"] = [
+ rssi_result["ap_tx_power"] - att
+ for att in postprocessed_results["total_attenuation"]
+ ]
+ elif "rssi" in key:
+ postprocessed_results[key]["data"] = [
+ x for data_point in rssi_result["rssi_result"]
+ for x in data_point[key]["data"]
+ ]
+ postprocessed_results[key]["mean"] = [
+ x[key]["mean"] for x in rssi_result["rssi_result"]
+ ]
+ postprocessed_results[key]["stdev"] = [
+ x[key]["stdev"] for x in rssi_result["rssi_result"]
+ ]
+ # Compute RSSI errors
+ for key, val in postprocessed_results.items():
+ if "chain" in key:
+ postprocessed_results[key]["error"] = [
+ postprocessed_results[key]["mean"][idx] + CONST_3dB -
+ postprocessed_results["predicted_rssi"][idx]
+ for idx in range(
+ len(postprocessed_results["predicted_rssi"]))
+ ]
+ elif "rssi" in key and "predicted" not in key:
+ postprocessed_results[key]["error"] = [
+ postprocessed_results[key]["mean"][idx] -
+ postprocessed_results["predicted_rssi"][idx]
+ for idx in range(
+ len(postprocessed_results["predicted_rssi"]))
+ ]
+ return postprocessed_results
+
+ def plot_rssi_vs_attenuation(self, postprocessed_results):
+ """Function to plot RSSI vs attenuation sweeps
+
+ Args:
+ postprocessed_results: compiled arrays of RSSI data.
+ """
data_sets = [[
- total_attenuation, total_attenuation, total_attenuation,
- total_attenuation, total_attenuation, total_attenuation
+ postprocessed_results["total_attenuation"],
+ postprocessed_results["total_attenuation"],
+ postprocessed_results["total_attenuation"],
+ postprocessed_results["total_attenuation"],
+ postprocessed_results["total_attenuation"],
+ postprocessed_results["total_attenuation"]
], [
- postprocessed_results["mean_signal_poll_rssi"],
- postprocessed_results["mean_signal_poll_avg_rssi"],
- postprocessed_results["mean_scan_rssi"],
- postprocessed_results["mean_chain_0_rssi"],
- postprocessed_results["mean_chain_1_rssi"],
+ postprocessed_results["signal_poll_rssi"]["mean"],
+ postprocessed_results["signal_poll_avg_rssi"]["mean"],
+ postprocessed_results["scan_rssi"]["mean"],
+ postprocessed_results["chain_0_rssi"]["mean"],
+ postprocessed_results["chain_1_rssi"]["mean"],
postprocessed_results["predicted_rssi"]
]]
legends = [
"Signal Poll RSSI", "Signal Poll AVG_RSSI", "Scan RSSI",
"Chain 0 RSSI", "Chain 1 RSSI", "Predicted RSSI"
]
- x_label = 'Attenuation (dB)'
- y_label = 'RSSI (dBm)'
fig_property = {
- "title": test_name,
- "x_label": x_label,
- "y_label": y_label,
+ "title": self.current_test_name,
+ "x_label": 'Attenuation (dB)',
+ "y_label": 'RSSI (dBm)',
"linewidth": 3,
"markersize": 10
}
- output_file_path = "{}/{}.html".format(self.log_path, test_name)
+ output_file_path = "{}/{}.html".format(self.log_path,
+ self.current_test_name)
wputils.bokeh_plot(
data_sets,
legends,
fig_property,
shaded_region=None,
output_file_path=output_file_path)
- return postprocessed_results
+
+ def plot_rssi_vs_time(self, rssi_result, postprocessed_results,
+ center_curves):
+ """Function to plot RSSI vs time.
+
+ Args:
+ rssi_result: dict containing raw RSSI data
+ postprocessed_results: compiled arrays of RSSI data
+ center_curvers: boolean indicating whether to shift curves to align
+ them with predicted RSSIs
+ """
+ x_data = []
+ y_data = []
+ legends = []
+ rssi_time_series = {
+ "signal_poll_rssi": [],
+ "signal_poll_avg_rssi": [],
+ "scan_rssi": [],
+ "chain_0_rssi": [],
+ "chain_1_rssi": [],
+ "predicted_rssi": []
+ }
+ for key, val in rssi_time_series.items():
+ if "predicted_rssi" in key:
+ rssi_time_series[key] = [
+ x for x in postprocessed_results[key] for copies in range(
+ len(rssi_result["rssi_result"][0]["signal_poll_rssi"][
+ "data"]))
+ ]
+ elif "rssi" in key:
+ if center_curves:
+ filtered_error = [
+ x for x in postprocessed_results[key]["error"]
+ if not math.isnan(x)
+ ]
+ if filtered_error:
+ avg_shift = statistics.mean(filtered_error)
+ else:
+ avg_shift = 0
+ rssi_time_series[key] = [
+ x - avg_shift
+ for x in postprocessed_results[key]["data"]
+ ]
+ else:
+ rssi_time_series[key] = postprocessed_results[key]["data"]
+ time = [
+ self.test_params["polling_frequency"] * x
+ for x in range(len(rssi_time_series[key]))
+ ]
+ if len(rssi_time_series[key]) > 0:
+ x_data.append(time)
+ y_data.append(rssi_time_series[key])
+ legends.append(key)
+ data_sets = [x_data, y_data]
+ fig_property = {
+ "title": self.current_test_name,
+ "x_label": 'Time (s)',
+ "y_label": center_curves * 'Centered' + 'RSSI (dBm)',
+ "linewidth": 3,
+ "markersize": 0
+ }
+ output_file_path = "{}/{}.html".format(self.log_path,
+ self.current_test_name)
+ wputils.bokeh_plot(
+ data_sets,
+ legends,
+ fig_property,
+ shaded_region=None,
+ output_file_path=output_file_path)
def get_scan_rssi(self, tracked_bssids, num_measurements=1):
"""Gets scan RSSI for specified BSSIDs.
@@ -320,7 +357,7 @@
"""
scan_rssi = {}
for bssid in tracked_bssids:
- scan_rssi[bssid] = {"rssi": [], "avg_rssi": None}
+ scan_rssi[bssid] = {"data": [], "mean": None, "stdev": None}
for idx in range(num_measurements):
scan_output = self.dut.adb.shell(SCAN)
time.sleep(MED_SLEEP)
@@ -330,25 +367,24 @@
bssid + ".*", scan_output, flags=re.IGNORECASE)
if bssid_result:
bssid_result = bssid_result.group(0).split("\t")
- scan_rssi[bssid]["rssi"].append(int(bssid_result[2]))
+ scan_rssi[bssid]["data"].append(int(bssid_result[2]))
else:
- scan_rssi[bssid]["rssi"].append(RSSI_ERROR_VAL)
+ scan_rssi[bssid]["data"].append(RSSI_ERROR_VAL)
# Compute mean RSSIs. Only average valid readings.
# Output RSSI_ERROR_VAL if no readings found.
for key, val in scan_rssi.items():
filtered_rssi_values = [
- x for x in val["rssi"] if not math.isnan(x)
+ x for x in val["data"] if not math.isnan(x)
]
if filtered_rssi_values:
- scan_rssi[key]["avg_rssi"] = sum(filtered_rssi_values) / len(
- filtered_rssi_values)
+ scan_rssi[key]["mean"] = statistics.mean(filtered_rssi_values)
if len(filtered_rssi_values) > 1:
scan_rssi[key]["stdev"] = statistics.stdev(
filtered_rssi_values)
else:
scan_rssi[key]["stdev"] = 0
else:
- scan_rssi[key]["avg_rssi"] = RSSI_ERROR_VAL
+ scan_rssi[key]["mean"] = RSSI_ERROR_VAL
scan_rssi[key]["stdev"] = RSSI_ERROR_VAL
return scan_rssi
@@ -366,10 +402,26 @@
statistics
"""
connected_rssi = {
- "signal_poll_rssi": [],
- "signal_poll_avg_rssi": [],
- "chain_0_rssi": [],
- "chain_1_rssi": []
+ "signal_poll_rssi": {
+ "data": [],
+ "mean": None,
+ "stdev": None
+ },
+ "signal_poll_avg_rssi": {
+ "data": [],
+ "mean": None,
+ "stdev": None
+ },
+ "chain_0_rssi": {
+ "data": [],
+ "mean": None,
+ "stdev": None
+ },
+ "chain_1_rssi": {
+ "data": [],
+ "mean": None,
+ "stdev": None
+ }
}
for idx in range(num_measurements):
measurement_start_time = time.time()
@@ -379,17 +431,21 @@
if match:
temp_rssi = int(match.group(0).split("=")[1])
if temp_rssi == -9999:
- connected_rssi["signal_poll_rssi"].append(RSSI_ERROR_VAL)
+ connected_rssi["signal_poll_rssi"]["data"].append(
+ RSSI_ERROR_VAL)
else:
- connected_rssi["signal_poll_rssi"].append(temp_rssi)
+ connected_rssi["signal_poll_rssi"]["data"].append(
+ temp_rssi)
else:
- connected_rssi["signal_poll_rssi"].append(RSSI_ERROR_VAL)
+ connected_rssi["signal_poll_rssi"]["data"].append(
+ RSSI_ERROR_VAL)
match = re.search("AVG_RSSI=.*", signal_poll_output)
if match:
- connected_rssi["signal_poll_avg_rssi"].append(
+ connected_rssi["signal_poll_avg_rssi"]["data"].append(
int(match.group(0).split("=")[1]))
else:
- connected_rssi["signal_poll_avg_rssi"].append(RSSI_ERROR_VAL)
+ connected_rssi["signal_poll_avg_rssi"]["data"].append(
+ RSSI_ERROR_VAL)
# Get per chain RSSI
per_chain_rssi = self.dut.adb.shell(STATION_DUMP)
match = re.search(".*signal avg:.*", per_chain_rssi)
@@ -397,33 +453,38 @@
per_chain_rssi = per_chain_rssi[per_chain_rssi.find("[") + 1:
per_chain_rssi.find("]")]
per_chain_rssi = per_chain_rssi.split(", ")
- connected_rssi["chain_0_rssi"].append(int(per_chain_rssi[0]))
- connected_rssi["chain_1_rssi"].append(int(per_chain_rssi[1]))
+ connected_rssi["chain_0_rssi"]["data"].append(
+ int(per_chain_rssi[0]))
+ connected_rssi["chain_1_rssi"]["data"].append(
+ int(per_chain_rssi[1]))
else:
- connected_rssi["chain_0_rssi"].append(RSSI_ERROR_VAL)
- connected_rssi["chain_1_rssi"].append(RSSI_ERROR_VAL)
+ connected_rssi["chain_0_rssi"]["data"].append(RSSI_ERROR_VAL)
+ connected_rssi["chain_1_rssi"]["data"].append(RSSI_ERROR_VAL)
measurement_elapsed_time = time.time() - measurement_start_time
time.sleep(max(0, polling_frequency - measurement_elapsed_time))
# Compute mean RSSIs. Only average valid readings.
# Output RSSI_ERROR_VAL if no valid connected readings found.
for key, val in connected_rssi.copy().items():
- filtered_rssi_values = [x for x in val if not math.isnan(x)]
+ filtered_rssi_values = [
+ x for x in val["data"] if not math.isnan(x)
+ ]
if filtered_rssi_values:
- connected_rssi["mean_{}".format(key)] = sum(
- filtered_rssi_values) / len(filtered_rssi_values)
+ connected_rssi[key]["mean"] = statistics.mean(
+ filtered_rssi_values)
if len(filtered_rssi_values) > 1:
- connected_rssi["stdev_{}".format(key)] = statistics.stdev(
+ connected_rssi[key]["stdev"] = statistics.stdev(
filtered_rssi_values)
else:
- connected_rssi["stdev_{}".format(key)] = 0
+ connected_rssi[key]["stdev"] = 0
else:
- connected_rssi["mean_{}".format(key)] = RSSI_ERROR_VAL
- connected_rssi["stdev_{}".format(key)] = RSSI_ERROR_VAL
+ connected_rssi[key]["mean"] = RSSI_ERROR_VAL
+ connected_rssi[key]["stdev"] = RSSI_ERROR_VAL
return connected_rssi
def rssi_test(self, iperf_traffic, connected_measurements,
- scan_measurements, bssids, polling_frequency):
+ scan_measurements, bssids, polling_frequency,
+ first_measurement_delay):
"""Test function to run RSSI tests.
The function runs an RSSI test in the current device/AP configuration.
@@ -458,16 +519,15 @@
self.attenuators[i].set_atten(atten)
for i in range(self.num_atten)
]
- time.sleep(MED_SLEEP)
+ time.sleep(first_measurement_delay)
current_rssi = {}
- current_rssi["connected_rssi"] = self.get_connected_rssi(
- connected_measurements, polling_frequency)
+ current_rssi = self.get_connected_rssi(connected_measurements,
+ polling_frequency)
current_rssi["scan_rssi"] = self.get_scan_rssi(
bssids, scan_measurements)
rssi_result.append(current_rssi)
self.log.info("Connected RSSI at {0:.2f} dB is {1:.2f} dB".format(
- atten,
- current_rssi["connected_rssi"]["mean_signal_poll_rssi"]))
+ atten, current_rssi["signal_poll_rssi"]["mean"]))
# Stop iperf traffic if needed
if self.iperf_traffic:
self.iperf_server.stop()
@@ -476,7 +536,8 @@
return rssi_result
def rssi_test_func(self, iperf_traffic, connected_measurements,
- scan_measurements, bssids, polling_frequency):
+ scan_measurements, bssids, polling_frequency,
+ first_measurement_delay):
"""Main function to test RSSI.
The function sets up the AP in the correct channel and mode
@@ -490,6 +551,14 @@
rssi_result = {}
# Configure AP
band = self.access_point.band_lookup_by_channel(self.channel)
+ if "2G" in band:
+ frequency = wutils.WifiEnums.channel_2G_to_freq[self.channel]
+ else:
+ frequency = wutils.WifiEnums.channel_5G_to_freq[self.channel]
+ if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES:
+ self.access_point.set_region(self.testbed_params["DFS_region"])
+ else:
+ self.access_point.set_region(self.testbed_params["default_region"])
self.access_point.set_channel(band, self.channel)
self.access_point.set_bandwidth(band, self.mode)
self.log.info("Access Point Configuration: {}".format(
@@ -504,7 +573,7 @@
wutils.reset_wifi(self.dut)
self.main_network[band]["channel"] = self.channel
wutils.wifi_connect(self.dut, self.main_network[band], num_of_tries=5)
- time.sleep(5)
+ time.sleep(MED_SLEEP)
# Run RvR and log result
rssi_result["test_name"] = self.current_test_name
rssi_result["ap_settings"] = self.access_point.ap_settings.copy()
@@ -523,15 +592,15 @@
"dut_front_end_loss"][str(self.channel)]
rssi_result["rssi_result"] = self.rssi_test(
iperf_traffic, connected_measurements, scan_measurements, bssids,
- polling_frequency)
+ polling_frequency, first_measurement_delay)
self.testclass_results.append(rssi_result)
return rssi_result
def _test_rssi_vs_atten(self):
""" Function that gets called for each test case of rssi_vs_atten
- The function gets called in each rvr test case. The function customizes
- the test based on the test name of the test that called it
+ The function gets called in each rssi test case. The function
+ customizes the test based on the test name of the test that called it
"""
test_params = self.current_test_name.split("_")
self.channel = int(test_params[4][2:])
@@ -539,22 +608,25 @@
self.iperf_traffic = "ActiveTraffic" in test_params[6]
self.iperf_args = '-i 1 -t 3600 -J -R'
band = self.access_point.band_lookup_by_channel(self.channel)
- num_atten_steps = int((self.test_params["rssi_atten_stop"] -
- self.test_params["rssi_atten_start"]) /
- self.test_params["rssi_atten_step"])
+ num_atten_steps = int((self.test_params["rssi_vs_atten_stop"] -
+ self.test_params["rssi_vs_atten_start"]) /
+ self.test_params["rssi_vs_atten_step"])
self.rssi_atten_range = [
- self.test_params["rssi_atten_start"] +
- x * self.test_params["rssi_atten_step"]
+ self.test_params["rssi_vs_atten_start"] +
+ x * self.test_params["rssi_vs_atten_step"]
for x in range(0, num_atten_steps)
]
rssi_result = self.rssi_test_func(
- self.iperf_traffic, self.test_params["connected_measurements"],
- self.test_params["scan_measurements"],
+ self.iperf_traffic,
+ self.test_params["rssi_vs_atten_connected_measurements"],
+ self.test_params["rssi_vs_atten_scan_measurements"],
[self.main_network[band]["BSSID"]],
- self.test_params["polling_frequency"])
- postprocessed_results = self.post_process_rssi_vs_attenuation(
- rssi_result)
- self.pass_fail_check_rssi_vs_attenuation(postprocessed_results)
+ self.test_params["polling_frequency"], MED_SLEEP)
+ postprocessed_results = self.post_process_rssi_sweep(rssi_result)
+ self.plot_rssi_vs_attenuation(postprocessed_results)
+ self.pass_fail_check_rssi_accuracy(
+ postprocessed_results, self.test_params["rssi_vs_atten_metrics"],
+ 1)
def _test_rssi_stability(self):
""" Function that gets called for each test case of rssi_stability
@@ -568,15 +640,60 @@
self.iperf_traffic = "ActiveTraffic" in test_params[5]
self.iperf_args = '-i 1 -t 3600 -J -R'
band = self.access_point.band_lookup_by_channel(self.channel)
- self.rssi_atten_range = self.test_params["stability_test_atten"]
+ self.rssi_atten_range = self.test_params["rssi_stability_atten"]
connected_measurements = int(
- self.test_params["stability_test_duration"] /
+ self.test_params["rssi_stability_duration"] /
self.test_params["polling_frequency"])
rssi_result = self.rssi_test_func(
self.iperf_traffic, connected_measurements, 0,
[self.main_network[band]["BSSID"]],
- self.test_params["polling_frequency"])
- self.pass_fail_check_rssi_stability(rssi_result)
+ self.test_params["polling_frequency"], MED_SLEEP)
+ postprocessed_results = self.post_process_rssi_sweep(rssi_result)
+ self.plot_rssi_vs_time(rssi_result, postprocessed_results, 1)
+ self.pass_fail_check_rssi_stability(postprocessed_results)
+
+ def _test_rssi_tracking(self):
+ """ Function that gets called for each test case of rssi_tracking
+
+ The function gets called in each rssi test case. The function
+ customizes the test based on the test name of the test that called it
+ """
+ test_params = self.current_test_name.split("_")
+ self.channel = int(test_params[3][2:])
+ self.mode = test_params[4]
+ self.iperf_traffic = "ActiveTraffic" in test_params[5]
+ self.iperf_args = '-i 1 -t 3600 -J -R'
+ band = self.access_point.band_lookup_by_channel(self.channel)
+ self.rssi_atten_range = []
+ for waveform in self.test_params["rssi_tracking_waveforms"]:
+ waveform_vector = []
+ for section in range(len(waveform["atten_levels"]) - 1):
+ section_limits = waveform["atten_levels"][section:section + 2]
+ if section_limits[0] < section_limits[1]:
+ waveform_vector = waveform_vector + sorted(
+ list(
+ range(section_limits[0], section_limits[1],
+ waveform["step_size"])) *
+ waveform["step_duration"])
+ else:
+ waveform_vector = waveform_vector + list(
+ reversed(
+ sorted(
+ list(
+ range(section_limits[1], section_limits[0],
+ waveform["step_size"])) *
+ waveform["step_duration"])))
+ waveform_vector = waveform_vector * waveform["repetitions"]
+ self.rssi_atten_range = self.rssi_atten_range + waveform_vector
+ connected_measurements = int(1 / self.test_params["polling_frequency"])
+ rssi_result = self.rssi_test_func(
+ self.iperf_traffic, connected_measurements, 0,
+ [self.main_network[band]["BSSID"]],
+ self.test_params["polling_frequency"], 0)
+ postprocessed_results = self.post_process_rssi_sweep(rssi_result)
+ self.plot_rssi_vs_time(rssi_result, postprocessed_results, 1)
+ self.pass_fail_check_rssi_accuracy(postprocessed_results,
+ ["signal_poll_rssi"], 0)
@test_tracker_info(uuid='519689b8-0a3c-4fd9-9227-fd7962d0f1a0')
def test_rssi_stability_ch1_VHT20_ActiveTraffic(self):
@@ -778,6 +895,14 @@
def test_rssi_vs_atten_ch161_VHT20_ActiveTraffic(self):
self._test_rssi_vs_atten()
+ @test_tracker_info(uuid='')
+ def test_rssi_tracking_ch161_VHT20_ActiveTraffic(self):
+ self._test_rssi_tracking()
+
+ @test_tracker_info(uuid='')
+ def test_rssi_tracking_ch161_VHT20_NoTraffic(self):
+ self._test_rssi_tracking()
+
class WifiRssi_2GHz_ActiveTraffic_Test(WifiRssiTest):
def __init__(self, controllers):
diff --git a/acts/tests/google/wifi/WifiStressTest.py b/acts/tests/google/wifi/WifiStressTest.py
index d891cc2..aee3d3b 100755
--- a/acts/tests/google/wifi/WifiStressTest.py
+++ b/acts/tests/google/wifi/WifiStressTest.py
@@ -53,7 +53,7 @@
req_params = []
opt_param = [
"open_network", "reference_networks", "iperf_server_address",
- "stress_count"]
+ "stress_count", "stress_hours"]
self.unpack_userparams(
req_param_names=req_params, opt_param_names=opt_param)
@@ -85,7 +85,6 @@
def on_fail(self, test_name, begin_time):
self.dut.take_bug_report(test_name, begin_time)
self.dut.cat_adb_log(test_name, begin_time)
- pass
def teardown_class(self):
wutils.reset_wifi(self.dut)
@@ -142,15 +141,20 @@
"""Toggle WiFi state ON and OFF for N times."""
for count in range(self.stress_count):
"""Test toggling wifi"""
- self.log.debug("Going from on to off.")
- wutils.wifi_toggle_state(self.dut, False)
- self.log.debug("Going from off to on.")
- startTime = time.time()
- wutils.wifi_toggle_state(self.dut, True)
- startup_time = time.time() - startTime
- self.log.debug("WiFi was enabled on the device in %s s." % startup_time)
+ try:
+ self.log.debug("Going from on to off.")
+ wutils.wifi_toggle_state(self.dut, False)
+ self.log.debug("Going from off to on.")
+ startTime = time.time()
+ wutils.wifi_toggle_state(self.dut, True)
+ startup_time = time.time() - startTime
+ self.log.debug("WiFi was enabled on the device in %s s." %
+ startup_time)
+ except:
+ signals.TestFailure(details="", extras={"Iterations":"%d" %
+ self.stress_count, "Pass":"%d" %count})
raise signals.TestPass(details="", extras={"Iterations":"%d" %
- (count+1)})
+ self.stress_count, "Pass":"%d" %(count+1)})
@test_tracker_info(uuid="49e3916a-9580-4bf7-a60d-a0f2545dcdde")
def test_stress_connect_traffic_disconnect_5g(self):
@@ -180,9 +184,10 @@
time.sleep(WAIT_BEFORE_CONNECTION)
except:
raise signals.TestFailure("Network connect-disconnect failed."
- "Look at logs", extras={"Iterations":"%d" %((count+1))})
+ "Look at logs", extras={"Iterations":"%d" %
+ self.stress_count, "Pass":"%d" %count})
raise signals.TestPass(details="", extras={"Iterations":"%d" %
- (count+1)})
+ self.stress_count, "Pass":"%d" %(count+1)})
@test_tracker_info(uuid="e9827dff-0755-43ec-8b50-1f9756958460")
def test_stress_connect_long_traffic_5g(self):
@@ -198,8 +203,7 @@
self.scan_and_connect_by_ssid(self.wpa_5g)
# Start IPerf traffic from server to phone.
# Download data for 5 hours.
- #sec = 5*60*60
- sec = 60
+ sec = self.stress_hours * 60 * 60
args = "-p {} -t {} -R".format(self.iperf_server.port, sec)
self.log.info("Running iperf client {}".format(args))
result, data = self.dut.run_iperf_client(self.iperf_server_address,
@@ -209,8 +213,10 @@
self.run_ping(sec)
except:
raise signals.TestFailure("Network long-connect failed."
- "Look at logs", extras={"Seconds":"UNKNOWN"})
- raise signals.TestPass(details="", extras={"Seconds":"%d" %sec})
+ "Look at logs", extras={"Total Hours":"%d" %self.stress_hours,
+ "Seconds Run":"UNKNOWN"})
+ raise signals.TestPass(details="", extras={"Total Hours":"%d" %
+ self.stress_hours, "Seconds":"%d" %sec})
@test_tracker_info(uuid="d367c83e-5b00-4028-9ed8-f7b875997d13")
def test_stress_wifi_failover(self):
@@ -236,14 +242,14 @@
cur_network = self.dut.droid.wifiGetConnectionInfo()
cur_ssid = cur_network[WifiEnums.SSID_KEY]
self.log.info("Cur_ssid = %s" % cur_ssid)
- for count in range(0,len(self.networks)):
+ for i in range(0,len(self.networks)):
self.log.debug("Forget network %s" % cur_ssid)
wutils.wifi_forget_network(self.dut, cur_ssid)
time.sleep(WAIT_FOR_AUTO_CONNECT)
cur_network = self.dut.droid.wifiGetConnectionInfo()
cur_ssid = cur_network[WifiEnums.SSID_KEY]
self.log.info("Cur_ssid = %s" % cur_ssid)
- if count == len(self.networks) - 1:
+ if i == len(self.networks) - 1:
break
if cur_ssid not in ssids:
raise signals.TestFailure("Device did not failover to the "
@@ -253,9 +259,10 @@
if len(network_config):
raise signals.TestFailure("All the network configurations were not "
"removed. Configured networks = %s" % network_config,
- extras={"Iterations":"%d" %((count+1)*4)})
+ extras={"Iterations":"%d" % self.stress_count,
+ "Pass":"%d" %(count*4)})
raise signals.TestPass(details="", extras={"Iterations":"%d" %
- ((count+1)*4)})
+ self.stress_count, "Pass":"%d" %((count+1)*4)})
@test_tracker_info(uuid="2c19e8d1-ac16-4d7e-b309-795144e6b956")
def test_stress_softAP_startup_and_stop_5g(self):
@@ -292,8 +299,10 @@
if initial_wifi_state != cur_wifi_state:
raise signals.TestFailure("Wifi state was %d before softAP and %d now!" %
(initial_wifi_state, cur_wifi_state),
- extras={"Iterations":"%d" %(count+1)})
- raise signals.TestPass(details="", extras={"Iterations":"%d" %(count+1)})
+ extras={"Iterations":"%d" % self.stres_count,
+ "Pass":"%d" %count})
+ raise signals.TestPass(details="", extras={"Iterations":"%d" %
+ self.stress_count, "Pass":"%d" %(count+1)})
@test_tracker_info(uuid="eb22e26b-95d1-4580-8c76-85dfe6a42a0f")
def test_stress_wifi_roaming(self):
@@ -314,6 +323,8 @@
"AP1_on_AP2_off", AP1_network)
except:
raise signals.TestFailure("Roaming failed. Look at logs",
- extras={"Iterations":"%d" %((count+1)*2)})
- raise signals.TestPass(details="", extras={"Iterations":"%d" %((count+1)*2)})
+ extras={"Iterations":"%d" %self.stress_count, "Pass":"%d" %
+ (count*2)})
+ raise signals.TestPass(details="", extras={"Iterations":"%d" %
+ self.stress_count, "Pass":"%d" %((count+1)*2)})