Merge "Add modem crash counters in TelLiveStressTest test report." am: ef5bafc474 am: b4fccf7bf3
am: 28c8600e3f
Change-Id: Ia8cc45c509ed66f012d0c6d0508f8e808eb369ff
diff --git a/acts/framework/acts/controllers/android_device.py b/acts/framework/acts/controllers/android_device.py
index 1bdb9d0..01a6712 100755
--- a/acts/framework/acts/controllers/android_device.py
+++ b/acts/framework/acts/controllers/android_device.py
@@ -1133,6 +1133,23 @@
raise AndroidDeviceError("User window cannot come up")
self.start_services(self.skip_sl4a)
+ def restart_runtime(self):
+ """Restarts android runtime.
+
+ Terminate all sl4a sessions, restarts runtime, wait for framework
+ complete restart, and restart an sl4a session if restart_sl4a is True.
+ """
+ self.stop_services()
+ self.log.info("Restarting android runtime")
+ self.adb.shell("stop")
+ self.adb.shell("start")
+ self.wait_for_boot_completion()
+ self.root_adb()
+ if not self.ensure_screen_on():
+ self.log.error("User window cannot come up")
+ raise AndroidDeviceError("User window cannot come up")
+ self.start_services(self.skip_sl4a)
+
def search_logcat(self, matching_string, begin_time=None):
"""Search logcat message with given string.
diff --git a/acts/framework/acts/controllers/attenuator_lib/_tnhelper.py b/acts/framework/acts/controllers/attenuator_lib/_tnhelper.py
index 2e98ec6..22b0318 100644
--- a/acts/framework/acts/controllers/attenuator_lib/_tnhelper.py
+++ b/acts/framework/acts/controllers/attenuator_lib/_tnhelper.py
@@ -20,7 +20,7 @@
User code shouldn't need to directly access this class.
"""
-
+import logging
import telnetlib
from acts.controllers import attenuator
@@ -44,7 +44,7 @@
def open(self, host, port=23):
if self._tn:
self._tn.close()
-
+ logging.debug("Attenuator IP = %s" % host)
self._tn = telnetlib.Telnet()
self._tn.open(host, port, 10)
diff --git a/acts/framework/acts/test_utils/wifi/WifiBaseTest.py b/acts/framework/acts/test_utils/wifi/WifiBaseTest.py
index bd62853..95c7b95 100755
--- a/acts/framework/acts/test_utils/wifi/WifiBaseTest.py
+++ b/acts/framework/acts/test_utils/wifi/WifiBaseTest.py
@@ -37,7 +37,7 @@
class WifiBaseTest(BaseTestClass):
def __init__(self, controllers):
BaseTestClass.__init__(self, controllers)
- if hasattr(self, 'attenuators'):
+ if hasattr(self, 'attenuators') and self.attenuators:
for attenuator in self.attenuators:
attenuator.set_atten(0)
diff --git a/acts/framework/acts/test_utils/wifi/aware/AwareBaseTest.py b/acts/framework/acts/test_utils/wifi/aware/AwareBaseTest.py
index 0b9192d..1e8f9d0 100644
--- a/acts/framework/acts/test_utils/wifi/aware/AwareBaseTest.py
+++ b/acts/framework/acts/test_utils/wifi/aware/AwareBaseTest.py
@@ -15,6 +15,7 @@
# limitations under the License.
from acts import asserts
+from acts import utils
from acts.base_test import BaseTestClass
from acts.test_utils.wifi import wifi_test_utils as wutils
from acts.test_utils.wifi.aware import aware_const as aconsts
@@ -23,7 +24,7 @@
class AwareBaseTest(BaseTestClass):
def __init__(self, controllers):
- BaseTestClass.__init__(self, controllers)
+ super(AwareBaseTest, self).__init__(controllers)
# message ID counter to make sure all uses are unique
msg_id = 0
@@ -43,6 +44,7 @@
"Device under test does not support Wi-Fi Aware - skipping test")
wutils.wifi_toggle_state(ad, True)
ad.droid.wifiP2pClose()
+ utils.set_location_service(ad, True)
aware_avail = ad.droid.wifiIsAwareAvailable()
if not aware_avail:
self.log.info('Aware not available. Waiting ...')
@@ -52,7 +54,8 @@
self.reset_device_parameters(ad)
self.reset_device_statistics(ad)
self.set_power_mode_parameters(ad)
-
+ utils.set_regulatory_domain(ad, "US")
+ autils.configure_ndp_allow_any_override(ad, True)
def teardown_test(self):
for ad in self.android_devices:
@@ -85,9 +88,9 @@
"""Set the power configuration DW parameters for the device based on any
configuration overrides (if provided)"""
if self.aware_default_power_mode == "INTERACTIVE":
- autils.config_dw_high_power(ad)
+ autils.config_settings_high_power(ad)
elif self.aware_default_power_mode == "NON_INTERACTIVE":
- autils.config_dw_low_power(ad)
+ autils.config_settings_low_power(ad)
else:
asserts.assert_false(
"The 'aware_default_power_mode' configuration must be INTERACTIVE or "
diff --git a/acts/framework/acts/test_utils/wifi/aware/aware_const.py b/acts/framework/acts/test_utils/wifi/aware/aware_const.py
index e94c9bb..c2bd2b9 100644
--- a/acts/framework/acts/test_utils/wifi/aware/aware_const.py
+++ b/acts/framework/acts/test_utils/wifi/aware/aware_const.py
@@ -15,14 +15,21 @@
# limitations under the License.
######################################################
-# Aware DW (Discovery Window) power mode values
+# Aware power settings values for interactive (high power) and
+# non-interactive (low power) modes
######################################################
-DW_24_INTERACTIVE = 1
-DW_5_INTERACTIVE = 1
+POWER_DW_24_INTERACTIVE = 1
+POWER_DW_5_INTERACTIVE = 1
+POWER_DISC_BEACON_INTERVAL_INTERACTIVE = 0
+POWER_NUM_SS_IN_DISC_INTERACTIVE = 0
+POWER_ENABLE_DW_EARLY_TERM_INTERACTIVE = 0
-DW_24_NON_INTERACTIVE = 4
-DW_5_NON_INTERACTIVE = 0
+POWER_DW_24_NON_INTERACTIVE = 4
+POWER_DW_5_NON_INTERACTIVE = 0
+POWER_DISC_BEACON_INTERVAL_NON_INTERACTIVE = 0
+POWER_NUM_SS_IN_DISC_NON_INTERACTIVE = 0
+POWER_ENABLE_DW_EARLY_TERM_NON_INTERACTIVE = 0
######################################################
# Broadcast events
@@ -51,6 +58,9 @@
DISCOVERY_KEY_DISCOVERY_TYPE = "DiscoveryType"
DISCOVERY_KEY_TTL = "TtlSec"
DISCOVERY_KEY_TERM_CB_ENABLED = "TerminateNotificationEnabled"
+DISCOVERY_KEY_RANGING_ENABLED = "RangingEnabled"
+DISCOVERY_KEY_MIN_DISTANCE_MM = "MinDistanceMm"
+DISCOVERY_KEY_MAX_DISTANCE_MM = "MaxDistanceMm"
PUBLISH_TYPE_UNSOLICITED = 0
PUBLISH_TYPE_SOLICITED = 1
@@ -84,6 +94,8 @@
SESSION_CB_ON_SESSION_CONFIG_FAILED = "WifiAwareSessionOnSessionConfigFailed"
SESSION_CB_ON_SESSION_TERMINATED = "WifiAwareSessionOnSessionTerminated"
SESSION_CB_ON_SERVICE_DISCOVERED = "WifiAwareSessionOnServiceDiscovered"
+SESSION_CB_ON_SERVICE_DISCOVERED_WITHIN_RANGE = \
+ "WifiAwareSessionOnServiceDiscoveredWithinRange"
SESSION_CB_ON_MESSAGE_SENT = "WifiAwareSessionOnMessageSent"
SESSION_CB_ON_MESSAGE_SEND_FAILED = "WifiAwareSessionOnMessageSendFailed"
SESSION_CB_ON_MESSAGE_RECEIVED = "WifiAwareSessionOnMessageReceived"
@@ -101,6 +113,7 @@
SESSION_CB_KEY_MESSAGE_AS_STRING = "messageAsString"
SESSION_CB_KEY_LATENCY_MS = "latencyMs"
SESSION_CB_KEY_TIMESTAMP_MS = "timestampMs"
+SESSION_CB_KEY_DISTANCE_MM = "distanceMm"
######################################################
# WifiAwareRangingListener events (RttManager.RttListener)
diff --git a/acts/framework/acts/test_utils/wifi/aware/aware_test_utils.py b/acts/framework/acts/test_utils/wifi/aware/aware_test_utils.py
index 4438064..4be392e 100644
--- a/acts/framework/acts/test_utils/wifi/aware/aware_test_utils.py
+++ b/acts/framework/acts/test_utils/wifi/aware/aware_test_utils.py
@@ -392,74 +392,122 @@
return dut.droid.wifiAwareCreateNetworkSpecifierOob(
id, dev_type, peer_mac, None, sec)
-def configure_dw(device, is_default, is_24_band, value):
- """Use the command-line API to configure the DW (discovery window) setting
+def configure_power_setting(device, mode, name, value):
+ """Use the command-line API to configure the power setting
Args:
device: Device on which to perform configuration
- is_default: True for the default setting, False for the non-interactive
- setting
- is_24_band: True for 2.4GHz band, False for 5GHz band
- value: An integer 0 to 5
+ mode: The power mode being set, should be "default", "inactive", or "idle"
+ name: One of the power settings from 'wifiaware set-power'.
+ value: An integer.
"""
- variable = 'dw_%s_%sghz' % ('default' if is_default else 'on_inactive', '24'
- if is_24_band else '5')
- device.adb.shell("cmd wifiaware native_api set %s %d" % (variable, value))
+ device.adb.shell(
+ "cmd wifiaware native_api set-power %s %s %d" % (mode, name, value))
-def config_dw_high_power(device):
- """Configure device's discovery window (DW) values to high power mode -
+def configure_ndp_allow_any_override(device, override_api_check):
+ """Use the command-line API to configure whether an NDP Responder may be
+ configured to accept an NDP request from ANY peer.
+
+ By default the target API level of the requesting app determines whether such
+ configuration is permitted. This allows overriding the API check and allowing
+ it.
+
+ Args:
+ device: Device on which to perform configuration.
+ override_api_check: True to allow a Responder to ANY configuration, False to
+ perform the API level check.
+ """
+ device.adb.shell("cmd wifiaware state_mgr allow_ndp_any %s" % (
+ "true" if override_api_check else "false"))
+
+def config_settings_high_power(device):
+ """Configure device's power settings values to high power mode -
whether device is in interactive or non-interactive modes"""
- configure_dw(
- device, is_default=True, is_24_band=True, value=aconsts.DW_24_INTERACTIVE)
- configure_dw(
- device, is_default=True, is_24_band=False, value=aconsts.DW_5_INTERACTIVE)
- configure_dw(
- device,
- is_default=False,
- is_24_band=True,
- value=aconsts.DW_24_INTERACTIVE)
- configure_dw(
- device,
- is_default=False,
- is_24_band=False,
- value=aconsts.DW_5_INTERACTIVE)
+ configure_power_setting(device, "default", "dw_24ghz",
+ aconsts.POWER_DW_24_INTERACTIVE)
+ configure_power_setting(device, "default", "dw_5ghz",
+ aconsts.POWER_DW_5_INTERACTIVE)
+ configure_power_setting(device, "default", "disc_beacon_interval_ms",
+ aconsts.POWER_DISC_BEACON_INTERVAL_INTERACTIVE)
+ configure_power_setting(device, "default", "num_ss_in_discovery",
+ aconsts.POWER_NUM_SS_IN_DISC_INTERACTIVE)
+ configure_power_setting(device, "default", "enable_dw_early_term",
+ aconsts.POWER_ENABLE_DW_EARLY_TERM_INTERACTIVE)
-def config_dw_low_power(device):
- """Configure device's discovery window (DW) values to low power mode - whether
+ configure_power_setting(device, "inactive", "dw_24ghz",
+ aconsts.POWER_DW_24_INTERACTIVE)
+ configure_power_setting(device, "inactive", "dw_5ghz",
+ aconsts.POWER_DW_5_INTERACTIVE)
+ configure_power_setting(device, "inactive", "disc_beacon_interval_ms",
+ aconsts.POWER_DISC_BEACON_INTERVAL_INTERACTIVE)
+ configure_power_setting(device, "inactive", "num_ss_in_discovery",
+ aconsts.POWER_NUM_SS_IN_DISC_INTERACTIVE)
+ configure_power_setting(device, "inactive", "enable_dw_early_term",
+ aconsts.POWER_ENABLE_DW_EARLY_TERM_INTERACTIVE)
+
+def config_settings_low_power(device):
+ """Configure device's power settings values to low power mode - whether
device is in interactive or non-interactive modes"""
- configure_dw(
- device,
- is_default=True,
- is_24_band=True,
- value=aconsts.DW_24_NON_INTERACTIVE)
- configure_dw(
- device,
- is_default=True,
- is_24_band=False,
- value=aconsts.DW_5_NON_INTERACTIVE)
- configure_dw(
- device,
- is_default=False,
- is_24_band=True,
- value=aconsts.DW_24_NON_INTERACTIVE)
- configure_dw(
- device,
- is_default=False,
- is_24_band=False,
- value=aconsts.DW_5_NON_INTERACTIVE)
+ configure_power_setting(device, "default", "dw_24ghz",
+ aconsts.POWER_DW_24_NON_INTERACTIVE)
+ configure_power_setting(device, "default", "dw_5ghz",
+ aconsts.POWER_DW_5_NON_INTERACTIVE)
+ configure_power_setting(device, "default", "disc_beacon_interval_ms",
+ aconsts.POWER_DISC_BEACON_INTERVAL_NON_INTERACTIVE)
+ configure_power_setting(device, "default", "num_ss_in_discovery",
+ aconsts.POWER_NUM_SS_IN_DISC_NON_INTERACTIVE)
+ configure_power_setting(device, "default", "enable_dw_early_term",
+ aconsts.POWER_ENABLE_DW_EARLY_TERM_NON_INTERACTIVE)
-def config_dw_all_modes(device, dw_24ghz, dw_5ghz):
+ configure_power_setting(device, "inactive", "dw_24ghz",
+ aconsts.POWER_DW_24_NON_INTERACTIVE)
+ configure_power_setting(device, "inactive", "dw_5ghz",
+ aconsts.POWER_DW_5_NON_INTERACTIVE)
+ configure_power_setting(device, "inactive", "disc_beacon_interval_ms",
+ aconsts.POWER_DISC_BEACON_INTERVAL_NON_INTERACTIVE)
+ configure_power_setting(device, "inactive", "num_ss_in_discovery",
+ aconsts.POWER_NUM_SS_IN_DISC_NON_INTERACTIVE)
+ configure_power_setting(device, "inactive", "enable_dw_early_term",
+ aconsts.POWER_ENABLE_DW_EARLY_TERM_NON_INTERACTIVE)
+
+
+def config_power_settings(device, dw_24ghz, dw_5ghz, disc_beacon_interval=None,
+ num_ss_in_disc=None, enable_dw_early_term=None):
"""Configure device's discovery window (DW) values to the specified values -
whether the device is in interactive or non-interactive mode.
Args:
dw_24ghz: DW interval in the 2.4GHz band.
dw_5ghz: DW interval in the 5GHz band.
+ disc_beacon_interval: The discovery beacon interval (in ms). If None then
+ not set.
+ num_ss_in_disc: Number of spatial streams to use for discovery. If None then
+ not set.
+ enable_dw_early_term: If True then enable early termination of the DW. If
+ None then not set.
"""
- configure_dw(device, is_default=True, is_24_band=True, value=dw_24ghz)
- configure_dw(device, is_default=True, is_24_band=False, value=dw_5ghz)
- configure_dw(device, is_default=False, is_24_band=True, value=dw_24ghz)
- configure_dw(device, is_default=False, is_24_band=False, value=dw_5ghz)
+ configure_power_setting(device, "default", "dw_24ghz", dw_24ghz)
+ configure_power_setting(device, "default", "dw_5ghz", dw_5ghz)
+ configure_power_setting(device, "inactive", "dw_24ghz", dw_24ghz)
+ configure_power_setting(device, "inactive", "dw_5ghz", dw_5ghz)
+
+ if disc_beacon_interval is not None:
+ configure_power_setting(device, "default", "disc_beacon_interval_ms",
+ disc_beacon_interval)
+ configure_power_setting(device, "inactive", "disc_beacon_interval_ms",
+ disc_beacon_interval)
+
+ if num_ss_in_disc is not None:
+ configure_power_setting(device, "default", "num_ss_in_discovery",
+ num_ss_in_disc)
+ configure_power_setting(device, "inactive", "num_ss_in_discovery",
+ num_ss_in_disc)
+
+ if enable_dw_early_term is not None:
+ configure_power_setting(device, "default", "enable_dw_early_term",
+ enable_dw_early_term)
+ configure_power_setting(device, "inactive", "enable_dw_early_term",
+ enable_dw_early_term)
def create_discovery_config(service_name,
d_type,
@@ -495,6 +543,36 @@
config[aconsts.DISCOVERY_KEY_TERM_CB_ENABLED] = term_cb_enable
return config
+def add_ranging_to_pub(p_config, enable_ranging):
+ """Add ranging enabled configuration to a publish configuration (only relevant
+ for publish configuration).
+
+ Args:
+ p_config: The Publish discovery configuration.
+ enable_ranging: True to enable ranging, False to disable.
+ Returns:
+ The modified publish configuration.
+ """
+ p_config[aconsts.DISCOVERY_KEY_RANGING_ENABLED] = enable_ranging
+ return p_config
+
+def add_ranging_to_sub(s_config, min_distance_mm, max_distance_mm):
+ """Add ranging distance configuration to a subscribe configuration (only
+ relevant to a subscribe configuration).
+
+ Args:
+ s_config: The Subscribe discovery configuration.
+ min_distance_mm, max_distance_mm: The min and max distance specification.
+ Used if not None.
+ Returns:
+ The modified subscribe configuration.
+ """
+ if min_distance_mm is not None:
+ s_config[aconsts.DISCOVERY_KEY_MIN_DISTANCE_MM] = min_distance_mm
+ if max_distance_mm is not None:
+ s_config[aconsts.DISCOVERY_KEY_MAX_DISTANCE_MM] = max_distance_mm
+ return s_config
+
def attach_with_identity(dut):
"""Start an Aware session (attach) and wait for confirmation and identity
information (mac address).
diff --git a/acts/framework/acts/test_utils/wifi/rtt/RttBaseTest.py b/acts/framework/acts/test_utils/wifi/rtt/RttBaseTest.py
new file mode 100644
index 0000000..9a07e69
--- /dev/null
+++ b/acts/framework/acts/test_utils/wifi/rtt/RttBaseTest.py
@@ -0,0 +1,62 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2017 - Google
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from acts import asserts
+from acts import utils
+from acts.base_test import BaseTestClass
+from acts.test_utils.wifi import wifi_test_utils as wutils
+from acts.test_utils.wifi.rtt import rtt_const as rconsts
+from acts.test_utils.wifi.rtt import rtt_test_utils as rutils
+
+
+class RttBaseTest(BaseTestClass):
+
+ def __init__(self, controllers):
+ super(RttBaseTest, self).__init__(controllers)
+
+ def setup_test(self):
+ required_params = ("lci_reference", "lcr_reference",
+ "rtt_reference_distance_mm",
+ "rtt_reference_distance_margin_mm",
+ "rtt_max_failure_rate_two_sided_rtt_percentage",
+ "rtt_max_failure_rate_one_sided_rtt_percentage",
+ "rtt_max_margin_exceeded_rate_two_sided_rtt_percentage",
+ "rtt_max_margin_exceeded_rate_one_sided_rtt_percentage",
+ "rtt_min_expected_rssi_dbm",
+ "stress_test_min_iteration_count",
+ "stress_test_target_run_time_sec")
+ self.unpack_userparams(required_params)
+
+ for ad in self.android_devices:
+ utils.set_location_service(ad, True)
+ asserts.skip_if(
+ not ad.droid.doesDeviceSupportWifiRttFeature(),
+ "Device under test does not support Wi-Fi RTT - skipping test")
+ wutils.wifi_toggle_state(ad, True)
+ rtt_avail = ad.droid.wifiIsRttAvailable()
+ if not rtt_avail:
+ self.log.info('RTT not available. Waiting ...')
+ rutils.wait_for_event(ad, rconsts.BROADCAST_WIFI_RTT_AVAILABLE)
+ ad.ed.clear_all_events()
+ rutils.config_privilege_override(ad, False)
+
+ def teardown_test(self):
+ for ad in self.android_devices:
+ if not ad.droid.doesDeviceSupportWifiRttFeature():
+ return
+
+ # clean-up queue from the System Service UID
+ ad.droid.wifiRttCancelRanging([1000])
diff --git a/acts/framework/acts/test_utils/wifi/rtt/__init__.py b/acts/framework/acts/test_utils/wifi/rtt/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/acts/framework/acts/test_utils/wifi/rtt/__init__.py
diff --git a/acts/framework/acts/test_utils/wifi/rtt/rtt_const.py b/acts/framework/acts/test_utils/wifi/rtt/rtt_const.py
new file mode 100644
index 0000000..4779f0c
--- /dev/null
+++ b/acts/framework/acts/test_utils/wifi/rtt/rtt_const.py
@@ -0,0 +1,57 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2017 - Google
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+######################################################
+# Broadcast events
+######################################################
+BROADCAST_WIFI_RTT_AVAILABLE = "WifiRttAvailable"
+BROADCAST_WIFI_RTT_NOT_AVAILABLE = "WifiRttNotAvailable"
+
+######################################################
+# RangingResultCallback events
+######################################################
+EVENT_CB_RANGING_ON_FAIL = "WifiRttRangingFailure"
+EVENT_CB_RANGING_ON_RESULT = "WifiRttRangingResults"
+
+EVENT_CB_RANGING_KEY_RESULTS = "Results"
+
+EVENT_CB_RANGING_KEY_STATUS = "status"
+EVENT_CB_RANGING_KEY_DISTANCE_MM = "distanceMm"
+EVENT_CB_RANGING_KEY_DISTANCE_STD_DEV_MM = "distanceStdDevMm"
+EVENT_CB_RANGING_KEY_RSSI = "rssi"
+EVENT_CB_RANGING_KEY_LCI = "lci"
+EVENT_CB_RANGING_KEY_LCR = "lcr"
+EVENT_CB_RANGING_KEY_TIMESTAMP = "timestamp"
+EVENT_CB_RANGING_KEY_MAC = "mac"
+EVENT_CB_RANGING_KEY_PEER_ID = "peerId"
+EVENT_CB_RANGING_KEY_MAC_AS_STRING = "macAsString"
+
+EVENT_CB_RANGING_STATUS_SUCCESS = 0
+EVENT_CB_RANGING_STATUS_FAIL = 1
+EVENT_CB_RANGING_STATUS_RESPONDER_DOES_NOT_SUPPORT_IEEE80211MC = 2
+
+######################################################
+# status codes
+######################################################
+
+RANGING_FAIL_CODE_GENERIC = 1
+RANGING_FAIL_CODE_RTT_NOT_AVAILABLE = 2
+
+######################################################
+# ScanResults keys
+######################################################
+
+SCAN_RESULT_KEY_RTT_RESPONDER = "is80211McRTTResponder"
\ No newline at end of file
diff --git a/acts/framework/acts/test_utils/wifi/rtt/rtt_test_utils.py b/acts/framework/acts/test_utils/wifi/rtt/rtt_test_utils.py
new file mode 100644
index 0000000..f17996b
--- /dev/null
+++ b/acts/framework/acts/test_utils/wifi/rtt/rtt_test_utils.py
@@ -0,0 +1,410 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2017 - Google
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import queue
+import statistics
+import time
+
+from acts import asserts
+from acts.test_utils.wifi import wifi_test_utils as wutils
+from acts.test_utils.wifi.rtt import rtt_const as rconsts
+
+# arbitrary timeout for events
+EVENT_TIMEOUT = 10
+
+
+def decorate_event(event_name, id):
+ return '%s_%d' % (event_name, id)
+
+
+def wait_for_event(ad, event_name, timeout=EVENT_TIMEOUT):
+ """Wait for the specified event or timeout.
+
+ Args:
+ ad: The android device
+ event_name: The event to wait on
+ timeout: Number of seconds to wait
+ Returns:
+ The event (if available)
+ """
+ prefix = ''
+ if hasattr(ad, 'pretty_name'):
+ prefix = '[%s] ' % ad.pretty_name
+ try:
+ event = ad.ed.pop_event(event_name, timeout)
+ ad.log.info('%s%s: %s', prefix, event_name, event['data'])
+ return event
+ except queue.Empty:
+ ad.log.info('%sTimed out while waiting for %s', prefix, event_name)
+ asserts.fail(event_name)
+
+def fail_on_event(ad, event_name, timeout=EVENT_TIMEOUT):
+ """Wait for a timeout period and looks for the specified event - fails if it
+ is observed.
+
+ Args:
+ ad: The android device
+ event_name: The event to wait for (and fail on its appearance)
+ """
+ prefix = ''
+ if hasattr(ad, 'pretty_name'):
+ prefix = '[%s] ' % ad.pretty_name
+ try:
+ event = ad.ed.pop_event(event_name, timeout)
+ ad.log.info('%sReceived unwanted %s: %s', prefix, event_name, event['data'])
+ asserts.fail(event_name, extras=event)
+ except queue.Empty:
+ ad.log.info('%s%s not seen (as expected)', prefix, event_name)
+ return
+
+
+def config_privilege_override(dut, override_to_no_privilege):
+ """Configure the device to override the permission check and to disallow any
+ privileged RTT operations, e.g. disallow one-sided RTT to Responders (APs)
+ which do not support IEEE 802.11mc.
+
+ Args:
+ dut: Device to configure.
+ override_to_no_privilege: True to indicate no privileged ops, False for
+ default (which will allow privileged ops).
+ """
+ dut.adb.shell("cmd wifirtt set override_assume_no_privilege %d" % (
+ 1 if override_to_no_privilege else 0))
+
+
+def get_rtt_constrained_results(scanned_networks, support_rtt):
+ """Filter the input list and only return those networks which either support
+ or do not support RTT (IEEE 802.11mc.)
+
+ Args:
+ scanned_networks: A list of networks from scan results.
+ support_rtt: True - only return those APs which support RTT, False - only
+ return those APs which do not support RTT.
+
+ Returns: a sub-set of the scanned_networks per support_rtt constraint.
+ """
+ matching_networks = []
+ for network in scanned_networks:
+ if support_rtt:
+ if (rconsts.SCAN_RESULT_KEY_RTT_RESPONDER in network and
+ network[rconsts.SCAN_RESULT_KEY_RTT_RESPONDER]):
+ matching_networks.append(network)
+ else:
+ if (rconsts.SCAN_RESULT_KEY_RTT_RESPONDER not in network or
+ not network[rconsts.SCAN_RESULT_KEY_RTT_RESPONDER]):
+ matching_networks.append(network)
+
+ return matching_networks
+
+
+def scan_networks(dut):
+ """Perform a scan and return scan results.
+
+ Args:
+ dut: Device under test.
+
+ Returns: an array of scan results.
+ """
+ wutils.start_wifi_connection_scan(dut)
+ return dut.droid.wifiGetScanResults()
+
+
+def scan_with_rtt_support_constraint(dut, support_rtt, repeat=0):
+ """Perform a scan and return scan results of APs: only those that support or
+ do not support RTT (IEEE 802.11mc) - per the support_rtt parameter.
+
+ Args:
+ dut: Device under test.
+ support_rtt: True - only return those APs which support RTT, False - only
+ return those APs which do not support RTT.
+ repeat: Re-scan this many times to find an RTT supporting network.
+
+ Returns: an array of scan results.
+ """
+ for i in range(repeat + 1):
+ scan_results = scan_networks(dut)
+ aps = get_rtt_constrained_results(scan_results, support_rtt)
+ if len(aps) != 0:
+ return aps
+
+ return []
+
+
+def validate_ap_result(scan_result, range_result):
+ """Validate the range results:
+ - Successful if AP (per scan result) support 802.11mc (allowed to fail
+ otherwise)
+ - MAC of result matches the BSSID
+
+ Args:
+ scan_result: Scan result for the AP
+ range_result: Range result returned by the RTT API
+ """
+ asserts.assert_equal(scan_result[wutils.WifiEnums.BSSID_KEY], range_result[
+ rconsts.EVENT_CB_RANGING_KEY_MAC_AS_STRING_BSSID], 'MAC/BSSID mismatch')
+ if (rconsts.SCAN_RESULT_KEY_RTT_RESPONDER in scan_result and
+ scan_result[rconsts.SCAN_RESULT_KEY_RTT_RESPONDER]):
+ asserts.assert_true(range_result[rconsts.EVENT_CB_RANGING_KEY_STATUS] ==
+ rconsts.EVENT_CB_RANGING_STATUS_SUCCESS,
+ 'Ranging failed for an AP which supports 802.11mc!')
+
+
+def validate_ap_results(scan_results, range_results):
+ """Validate an array of ranging results against the scan results used to
+ trigger the range. The assumption is that the results are returned in the
+ same order as the request (which were the scan results).
+
+ Args:
+ scan_results: Scans results used to trigger the range request
+ range_results: Range results returned by the RTT API
+ """
+ asserts.assert_equal(
+ len(scan_results),
+ len(range_results),
+ 'Mismatch in length of scan results and range results')
+
+ # sort first based on BSSID/MAC
+ scan_results.sort(key=lambda x: x[wutils.WifiEnums.BSSID_KEY])
+ range_results.sort(
+ key=lambda x: x[rconsts.EVENT_CB_RANGING_KEY_MAC_AS_STRING_BSSID])
+
+ for i in range(len(scan_results)):
+ validate_ap_result(scan_results[i], range_results[i])
+
+
+def validate_aware_mac_result(range_result, mac, description):
+ """Validate the range result for an Aware peer specified with a MAC address:
+ - Correct MAC address.
+
+ The MAC addresses may contain ":" (which are ignored for the comparison) and
+ may be in any case (which is ignored for the comparison).
+
+ Args:
+ range_result: Range result returned by the RTT API
+ mac: MAC address of the peer
+ description: Additional content to print on failure
+ """
+ mac1 = mac.replace(':', '').lower()
+ mac2 = range_result[rconsts.EVENT_CB_RANGING_KEY_MAC_AS_STRING].replace(':',
+ '').lower()
+ asserts.assert_equal(mac1, mac2,
+ '%s: MAC mismatch' % description)
+
+def validate_aware_peer_id_result(range_result, peer_id, description):
+ """Validate the range result for An Aware peer specified with a Peer ID:
+ - Correct Peer ID
+ - MAC address information not available
+
+ Args:
+ range_result: Range result returned by the RTT API
+ peer_id: Peer ID of the peer
+ description: Additional content to print on failure
+ """
+ asserts.assert_equal(peer_id,
+ range_result[rconsts.EVENT_CB_RANGING_KEY_PEER_ID],
+ '%s: Peer Id mismatch' % description)
+ asserts.assert_false(rconsts.EVENT_CB_RANGING_KEY_MAC in range_result,
+ '%s: MAC Address not empty!' % description)
+
+
+def extract_stats(results, range_reference_mm, range_margin_mm, min_rssi,
+ reference_lci=[], reference_lcr=[], summary_only=False):
+ """Extract statistics from a list of RTT results. Returns a dictionary
+ with results:
+ - num_results (success or fails)
+ - num_success_results
+ - num_no_results (e.g. timeout)
+ - num_failures
+ - num_range_out_of_margin (only for successes)
+ - num_invalid_rssi (only for successes)
+ - distances: extracted list of distances
+ - distance_std_devs: extracted list of distance standard-deviations
+ - rssis: extracted list of RSSI
+ - distance_mean
+ - distance_std_dev (based on distance - ignoring the individual std-devs)
+ - rssi_mean
+ - rssi_std_dev
+ - status_codes
+ - lcis: extracted list of all of the individual LCI
+ - lcrs: extracted list of all of the individual LCR
+ - any_lci_mismatch: True/False - checks if all LCI results are identical to
+ the reference LCI.
+ - any_lcr_mismatch: True/False - checks if all LCR results are identical to
+ the reference LCR.
+
+ Args:
+ results: List of RTT results.
+ range_reference_mm: Reference value for the distance (in mm)
+ range_margin_mm: Acceptable absolute margin for distance (in mm)
+ min_rssi: Acceptable minimum RSSI value.
+ reference_lci, reference_lcr: Reference values for LCI and LCR.
+ summary_only: Only include summary keys (reduce size).
+
+ Returns: A dictionary of stats.
+ """
+ stats = {}
+ stats['num_results'] = 0
+ stats['num_success_results'] = 0
+ stats['num_no_results'] = 0
+ stats['num_failures'] = 0
+ stats['num_range_out_of_margin'] = 0
+ stats['num_invalid_rssi'] = 0
+ stats['any_lci_mismatch'] = False
+ stats['any_lcr_mismatch'] = False
+
+ range_max_mm = range_reference_mm + range_margin_mm
+ range_min_mm = range_reference_mm - range_margin_mm
+
+ distances = []
+ distance_std_devs = []
+ rssis = []
+ status_codes = []
+ lcis = []
+ lcrs = []
+
+ for i in range(len(results)):
+ result = results[i]
+
+ if result is None: # None -> timeout waiting for RTT result
+ stats['num_no_results'] = stats['num_no_results'] + 1
+ continue
+ stats['num_results'] = stats['num_results'] + 1
+
+ status_codes.append(result[rconsts.EVENT_CB_RANGING_KEY_STATUS])
+ if status_codes[-1] != rconsts.EVENT_CB_RANGING_STATUS_SUCCESS:
+ stats['num_failures'] = stats['num_failures'] + 1
+ continue
+ stats['num_success_results'] = stats['num_success_results'] + 1
+
+ distance_mm = result[rconsts.EVENT_CB_RANGING_KEY_DISTANCE_MM]
+ distances.append(distance_mm)
+ if not range_min_mm <= distance_mm <= range_max_mm:
+ stats['num_range_out_of_margin'] = stats['num_range_out_of_margin'] + 1
+ distance_std_devs.append(
+ result[rconsts.EVENT_CB_RANGING_KEY_DISTANCE_STD_DEV_MM])
+
+ rssi = result[rconsts.EVENT_CB_RANGING_KEY_RSSI]
+ rssis.append(rssi)
+ if not min_rssi <= rssi <= 0:
+ stats['num_invalid_rssi'] = stats['num_invalid_rssi'] + 1
+
+ lcis.append(result[rconsts.EVENT_CB_RANGING_KEY_LCI])
+ if (result[rconsts.EVENT_CB_RANGING_KEY_LCI] != reference_lci):
+ stats['any_lci_mismatch'] = True
+ lcrs.append(result[rconsts.EVENT_CB_RANGING_KEY_LCR])
+ if (result[rconsts.EVENT_CB_RANGING_KEY_LCR] != reference_lcr):
+ stats['any_lcr_mismatch'] = True
+
+ if len(distances) > 0:
+ stats['distance_mean'] = statistics.mean(distances)
+ if len(distances) > 1:
+ stats['distance_std_dev'] = statistics.stdev(distances)
+ if len(rssis) > 0:
+ stats['rssi_mean'] = statistics.mean(rssis)
+ if len(rssis) > 1:
+ stats['rssi_std_dev'] = statistics.stdev(rssis)
+ if not summary_only:
+ stats['distances'] = distances
+ stats['distance_std_devs'] = distance_std_devs
+ stats['rssis'] = rssis
+ stats['status_codes'] = status_codes
+ stats['lcis'] = lcis
+ stats['lcrs'] = lcrs
+
+ return stats
+
+
+def run_ranging(dut, aps, iter_count, time_between_iterations,
+ target_run_time_sec=0):
+ """Executing ranging to the set of APs.
+
+ Will execute a minimum of 'iter_count' iterations. Will continue to run
+ until execution time (just) exceeds 'target_run_time_sec'.
+
+ Args:
+ dut: Device under test
+ aps: A list of APs (Access Points) to range to.
+ iter_count: (Minimum) Number of measurements to perform.
+ time_between_iterations: Number of seconds to wait between iterations.
+ target_run_time_sec: The target run time in seconds.
+
+ Returns: a list of the events containing the RTT results (or None for a
+ failed measurement).
+ """
+ max_peers = dut.droid.wifiRttMaxPeersInRequest()
+
+ asserts.assert_true(len(aps) > 0, "Need at least one AP!")
+ if len(aps) > max_peers:
+ aps = aps[0:max_peers]
+
+ events = {} # need to keep track per BSSID!
+ for ap in aps:
+ events[ap["BSSID"]] = []
+
+ start_clock = time.time()
+ iterations_done = 0
+ run_time = 0
+ while iterations_done < iter_count or (
+ target_run_time_sec != 0 and run_time < target_run_time_sec):
+ if iterations_done != 0 and time_between_iterations != 0:
+ time.sleep(time_between_iterations)
+
+ id = dut.droid.wifiRttStartRangingToAccessPoints(aps)
+ try:
+ event = dut.ed.pop_event(
+ decorate_event(rconsts.EVENT_CB_RANGING_ON_RESULT, id), EVENT_TIMEOUT)
+ range_results = event["data"][rconsts.EVENT_CB_RANGING_KEY_RESULTS]
+ asserts.assert_equal(
+ len(aps),
+ len(range_results),
+ 'Mismatch in length of scan results and range results')
+ for result in range_results:
+ bssid = result[rconsts.EVENT_CB_RANGING_KEY_MAC_AS_STRING]
+ asserts.assert_true(bssid in events,
+ "Result BSSID %s not in requested AP!?" % bssid)
+ asserts.assert_equal(len(events[bssid]), iterations_done,
+ "Duplicate results for BSSID %s!?" % bssid)
+ events[bssid].append(result)
+ except queue.Empty:
+ for ap in aps:
+ events[ap["BSSID"]].append(None)
+
+ iterations_done = iterations_done + 1
+ run_time = time.time() - start_clock
+
+ return events
+
+
+def analyze_results(all_aps_events, rtt_reference_distance_mm,
+ distance_margin_mm, min_expected_rssi, lci_reference, lcr_reference,
+ summary_only=False):
+ """Verifies the results of the RTT experiment.
+
+ Args:
+ all_aps_events: Dictionary of APs, each a list of RTT result events.
+ rtt_reference_distance_mm: Expected distance to the AP (source of truth).
+ distance_margin_mm: Accepted error marging in distance measurement.
+ min_expected_rssi: Minimum acceptable RSSI value
+ lci_reference, lcr_reference: Expected LCI/LCR values (arrays of bytes).
+ summary_only: Only include summary keys (reduce size).
+ """
+ all_stats = {}
+ for bssid, events in all_aps_events.items():
+ stats = extract_stats(events, rtt_reference_distance_mm,
+ distance_margin_mm, min_expected_rssi,
+ lci_reference, lcr_reference, summary_only)
+ all_stats[bssid] = stats
+ return all_stats
diff --git a/acts/framework/acts/test_utils/wifi/wifi_test_utils.py b/acts/framework/acts/test_utils/wifi/wifi_test_utils.py
index f48ec92..b6ce1ed 100755
--- a/acts/framework/acts/test_utils/wifi/wifi_test_utils.py
+++ b/acts/framework/acts/test_utils/wifi/wifi_test_utils.py
@@ -50,6 +50,7 @@
PWD_KEY = "password"
frequency_key = "frequency"
APBAND_KEY = "apBand"
+ HIDDEN_KEY = "hiddenSSID"
WIFI_CONFIG_APBAND_2G = 0
WIFI_CONFIG_APBAND_5G = 1
@@ -688,6 +689,7 @@
Args:
ad: An AndroidDevice object.
"""
+ ad.ed.clear_all_events()
ad.droid.wifiStartScan()
try:
ad.ed.pop_event("WifiManagerScanResultsAvailable", 60)
@@ -695,6 +697,99 @@
asserts.fail("Wi-Fi results did not become available within 60s.")
+def start_wifi_connection_scan_and_return_status(ad):
+ """
+ Starts a wifi connection scan and wait for results to become available
+ or a scan failure to be reported.
+
+ Args:
+ ad: An AndroidDevice object.
+ Returns:
+ True: if scan succeeded & results are available
+ False: if scan failed
+ """
+ ad.ed.clear_all_events()
+ ad.droid.wifiStartScan()
+ try:
+ events = ad.ed.pop_events(
+ "WifiManagerScan(ResultsAvailable|Failure)", 60)
+ except Empty:
+ asserts.fail(
+ "Wi-Fi scan results/failure did not become available within 60s.")
+ # If there are multiple matches, we check for atleast one success.
+ for event in events:
+ if event["name"] == "WifiManagerScanResultsAvailable":
+ return True
+ elif event["name"] == "WifiManagerScanFailure":
+ ad.log.debug("Scan failure received")
+ return False
+
+
+def start_wifi_connection_scan_and_check_for_network(ad, network_ssid,
+ max_tries=3):
+ """
+ Start connectivity scans & checks if the |network_ssid| is seen in
+ scan results. The method performs a max of |max_tries| connectivity scans
+ to find the network.
+
+ Args:
+ ad: An AndroidDevice object.
+ network_ssid: SSID of the network we are looking for.
+ max_tries: Number of scans to try.
+ Returns:
+ True: if network_ssid is found in scan results.
+ False: if network_ssid is not found in scan results.
+ """
+ for num_tries in range(max_tries):
+ if start_wifi_connection_scan_and_return_status(ad):
+ scan_results = ad.droid.wifiGetScanResults()
+ match_results = match_networks(
+ {WifiEnums.SSID_KEY: network_ssid}, scan_results)
+ if len(match_results) > 0:
+ return True
+ return False
+
+
+def start_wifi_connection_scan_and_ensure_network_found(ad, network_ssid,
+ max_tries=3):
+ """
+ Start connectivity scans & ensure the |network_ssid| is seen in
+ scan results. The method performs a max of |max_tries| connectivity scans
+ to find the network.
+ This method asserts on failure!
+
+ Args:
+ ad: An AndroidDevice object.
+ network_ssid: SSID of the network we are looking for.
+ max_tries: Number of scans to try.
+ """
+ ad.log.info("Starting scans to ensure %s is present", network_ssid)
+ assert_msg = "Failed to find " + network_ssid + " in scan results" \
+ " after " + str(max_tries) + " tries"
+ asserts.assert_true(start_wifi_connection_scan_and_check_for_network(
+ ad, network_ssid, max_tries), assert_msg)
+
+
+def start_wifi_connection_scan_and_ensure_network_not_found(ad, network_ssid,
+ max_tries=3):
+ """
+ Start connectivity scans & ensure the |network_ssid| is not seen in
+ scan results. The method performs a max of |max_tries| connectivity scans
+ to find the network.
+ This method asserts on failure!
+
+ Args:
+ ad: An AndroidDevice object.
+ network_ssid: SSID of the network we are looking for.
+ max_tries: Number of scans to try.
+ """
+ ad.log.info("Starting scans to ensure %s is not present", network_ssid)
+ assert_msg = "Found " + network_ssid + " in scan results" \
+ " after " + str(max_tries) + " tries"
+ asserts.assert_false(start_wifi_connection_scan_and_check_for_network(
+ ad, network_ssid, max_tries), assert_msg)
+
+
def start_wifi_background_scan(ad, scan_setting):
"""Starts wifi background scan.
@@ -711,7 +806,7 @@
return event['data']
-def start_wifi_tethering(ad, ssid, password, band=None):
+def start_wifi_tethering(ad, ssid, password, band=None, hidden=None):
"""Starts wifi tethering on an android_device.
Args:
@@ -720,6 +815,7 @@
password: The password the soft AP should use.
band: The band the soft AP should be set on. It should be either
WifiEnums.WIFI_CONFIG_APBAND_2G or WifiEnums.WIFI_CONFIG_APBAND_5G.
+ hidden: boolean to indicate if the AP needs to be hidden or not.
Returns:
No return value. Error checks in this function will raise test failure signals
@@ -729,6 +825,8 @@
config[WifiEnums.PWD_KEY] = password
if band:
config[WifiEnums.APBAND_KEY] = band
+ if hidden:
+ config[WifiEnums.HIDDEN_KEY] = hidden
asserts.assert_true(
ad.droid.wifiSetWifiApConfiguration(config),
"Failed to update WifiAp Configuration")
@@ -925,7 +1023,7 @@
ad.droid.wifiStartTrackingStateChange()
event = ad.ed.pop_event("WifiNetworkDisconnected", 10)
ad.droid.wifiStopTrackingStateChange()
- except queue.Empty:
+ except Empty:
raise signals.TestFailure("Device did not disconnect from the network")
@@ -935,13 +1033,8 @@
Args:
params: A tuple of network info and AndroidDevice object.
"""
- droid = ad.droid
- ed = ad.ed
- SSID = network[WifiEnums.SSID_KEY]
- ed.clear_all_events()
- start_wifi_connection_scan(ad)
- scan_results = droid.wifiGetScanResults()
- assert_network_in_list({WifiEnums.SSID_KEY: SSID}, scan_results)
+ start_wifi_connection_scan_and_ensure_network_found(
+ ad, network[WifiEnums.SSID_KEY])
wifi_connect(ad, network, num_of_tries=3)
@@ -956,12 +1049,8 @@
False otherwise.
"""
- ad.ed.clear_all_events()
- start_wifi_connection_scan(ad)
- scan_results = ad.droid.wifiGetScanResults()
- assert_network_in_list({
- WifiEnums.SSID_KEY: network_ssid
- }, scan_results)
+ start_wifi_connection_scan_and_ensure_network_found(
+ ad, network[WifiEnums.SSID_KEY])
wifi_connect_by_id(ad, network_id)
connect_data = ad.droid.wifiGetConnectionInfo()
connect_ssid = connect_data[WifiEnums.SSID_KEY]
diff --git a/acts/tests/google/wifi/WifiAutoUpdateTest.py b/acts/tests/google/wifi/WifiAutoUpdateTest.py
index f91c18d..4a8ab7b 100755
--- a/acts/tests/google/wifi/WifiAutoUpdateTest.py
+++ b/acts/tests/google/wifi/WifiAutoUpdateTest.py
@@ -20,7 +20,7 @@
import time
import acts.base_test
-import acts.signals
+import acts.signals as signals
import acts.test_utils.wifi.wifi_test_utils as wutils
import acts.utils
diff --git a/acts/tests/google/wifi/WifiCrashTest.py b/acts/tests/google/wifi/WifiCrashTest.py
new file mode 100755
index 0000000..18b2163
--- /dev/null
+++ b/acts/tests/google/wifi/WifiCrashTest.py
@@ -0,0 +1,177 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2018 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import itertools
+import pprint
+import queue
+import time
+
+import acts.base_test
+import acts.signals as signals
+import acts.test_utils.wifi.wifi_test_utils as wutils
+import acts.utils
+
+from acts import asserts
+from acts.test_decorators import test_tracker_info
+from acts.test_utils.wifi.WifiBaseTest import WifiBaseTest
+
+WifiEnums = wutils.WifiEnums
+# Default timeout used for reboot, toggle WiFi and Airplane mode,
+# for the system to settle down after the operation.
+DEFAULT_TIMEOUT = 10
+WIFICOND_KILL_SHELL_COMMAND = "killall wificond"
+WIFI_VENDOR_HAL_KILL_SHELL_COMMAND = "killall android.hardware.wifi@1.0-service"
+SUPPLICANT_KILL_SHELL_COMMAND = "killall wpa_supplicant"
+
+class WifiCrashTest(WifiBaseTest):
+ """Crash Tests for wifi stack.
+
+ Test Bed Requirement:
+ * One Android device
+ * One Wi-Fi network visible to the device.
+ """
+
+ def __init__(self, controllers):
+ WifiBaseTest.__init__(self, controllers)
+
+ def setup_class(self):
+ self.dut = self.android_devices[0]
+ wutils.wifi_test_device_init(self.dut)
+ req_params = []
+ opt_param = ["reference_networks"]
+ self.unpack_userparams(
+ req_param_names=req_params, opt_param_names=opt_param)
+
+ if "AccessPoint" in self.user_params:
+ self.legacy_configure_ap_and_start()
+
+ asserts.assert_true(
+ len(self.reference_networks) > 0,
+ "Need at least one reference network with psk.")
+ self.network = self.reference_networks[0]["2g"]
+
+ def setup_test(self):
+ self.dut.droid.wakeLockAcquireBright()
+ self.dut.droid.wakeUpNow()
+ wutils.wifi_toggle_state(self.dut, True)
+
+ def teardown_test(self):
+ self.dut.droid.wakeLockRelease()
+ self.dut.droid.goToSleepNow()
+ wutils.reset_wifi(self.dut)
+
+ def on_fail(self, test_name, begin_time):
+ self.dut.take_bug_report(test_name, begin_time)
+ self.dut.cat_adb_log(test_name, begin_time)
+
+ def teardown_class(self):
+ if "AccessPoint" in self.user_params:
+ del self.user_params["reference_networks"]
+
+ """Helper Functions"""
+
+ """Tests"""
+ @test_tracker_info(uuid="")
+ def test_wifi_framework_crash_reconnect(self):
+ """Connect to a network, crash framework, then ensure
+ we connect back to the previously connected network.
+
+ Steps:
+ 1. Connect to a network.
+ 2. Restart framework.
+ 3. Reconnect to the previous network.
+
+ """
+ wutils.wifi_connect(self.dut, self.network, num_of_tries=3)
+ # Restart framework
+ self.log.info("Crashing framework")
+ self.dut.restart_runtime()
+ # We won't get the disconnect broadcast because framework crashed.
+ # wutils.wait_for_disconnect(self.dut)
+ time.sleep(DEFAULT_TIMEOUT)
+ wifi_info = self.dut.droid.wifiGetConnectionInfo()
+ if wifi_info[WifiEnums.SSID_KEY] != self.network[WifiEnums.SSID_KEY]:
+ raise signals.TestFailure("Device did not connect to the"
+ " network after crashing framework.")
+
+ @test_tracker_info(uuid="")
+ def test_wifi_cond_crash_reconnect(self):
+ """Connect to a network, crash wificond, then ensure
+ we connect back to the previously connected network.
+
+ Steps:
+ 1. Connect to a network.
+ 2. Crash wificond.
+ 3. Ensure we get a disconnect.
+ 4. Ensure we reconnect to the previous network.
+
+ """
+ wutils.wifi_connect(self.dut, self.network, num_of_tries=3)
+ # Restart wificond
+ self.log.info("Crashing wificond")
+ self.dut.adb.shell(WIFICOND_KILL_SHELL_COMMAND)
+ wutils.wait_for_disconnect(self.dut)
+ time.sleep(DEFAULT_TIMEOUT)
+ wifi_info = self.dut.droid.wifiGetConnectionInfo()
+ if wifi_info[WifiEnums.SSID_KEY] != self.network[WifiEnums.SSID_KEY]:
+ raise signals.TestFailure("Device did not connect to the"
+ " network after crashing wificond.")
+
+ @test_tracker_info(uuid="")
+ def test_wifi_vendorhal_crash_reconnect(self):
+ """Connect to a network, crash wifi HAL, then ensure
+ we connect back to the previously connected network.
+
+ Steps:
+ 1. Connect to a network.
+ 2. Crash wifi HAL.
+ 3. Ensure we get a disconnect.
+ 4. Ensure we reconnect to the previous network.
+
+ """
+ wutils.wifi_connect(self.dut, self.network, num_of_tries=3)
+ # Restart wificond
+ self.log.info("Crashing wifi HAL")
+ self.dut.adb.shell(WIFI_VENDOR_HAL_KILL_SHELL_COMMAND)
+ wutils.wait_for_disconnect(self.dut)
+ time.sleep(DEFAULT_TIMEOUT)
+ wifi_info = self.dut.droid.wifiGetConnectionInfo()
+ if wifi_info[WifiEnums.SSID_KEY] != self.network[WifiEnums.SSID_KEY]:
+ raise signals.TestFailure("Device did not connect to the"
+ " network after crashing wifi HAL.")
+
+ @test_tracker_info(uuid="")
+ def test_wpa_supplicant_crash_reconnect(self):
+ """Connect to a network, crash wpa_supplicant, then ensure
+ we connect back to the previously connected network.
+
+ Steps:
+ 1. Connect to a network.
+ 2. Crash wpa_supplicant.
+ 3. Ensure we get a disconnect.
+ 4. Ensure we reconnect to the previous network.
+
+ """
+ wutils.wifi_connect(self.dut, self.network, num_of_tries=3)
+ # Restart wificond
+ self.log.info("Crashing wpa_supplicant")
+ self.dut.adb.shell(SUPPLICANT_KILL_SHELL_COMMAND)
+ wutils.wait_for_disconnect(self.dut)
+ time.sleep(DEFAULT_TIMEOUT)
+ wifi_info = self.dut.droid.wifiGetConnectionInfo()
+ if wifi_info[WifiEnums.SSID_KEY] != self.network[WifiEnums.SSID_KEY]:
+ raise signals.TestFailure("Device did not connect to the"
+ " network after crashing wpa_supplicant.")
diff --git a/acts/tests/google/wifi/WifiDiagnosticsTest.py b/acts/tests/google/wifi/WifiDiagnosticsTest.py
new file mode 100644
index 0000000..ddf704e
--- /dev/null
+++ b/acts/tests/google/wifi/WifiDiagnosticsTest.py
@@ -0,0 +1,102 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2018 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import itertools
+import pprint
+import queue
+import time
+
+import acts.base_test
+import acts.signals as signals
+import acts.test_utils.wifi.wifi_test_utils as wutils
+import acts.utils
+
+from acts import asserts
+from acts.test_decorators import test_tracker_info
+from acts.test_utils.wifi.WifiBaseTest import WifiBaseTest
+
+WifiEnums = wutils.WifiEnums
+
+DEFAULT_WAIT_TIME = 2
+
+
+class WifiDiagnosticsTest(WifiBaseTest):
+ """
+ Test Bed Requirement:
+ * One Android device
+ * An open Wi-Fi network.
+ * Verbose logging is on.
+ """
+
+ def __init__(self, controllers):
+ WifiBaseTest.__init__(self, controllers)
+
+ def setup_class(self):
+ self.dut = self.android_devices[0]
+ wutils.wifi_test_device_init(self.dut)
+ req_params = []
+ opt_param = ["open_network"]
+ self.unpack_userparams(
+ req_param_names=req_params, opt_param_names=opt_param)
+
+ if "AccessPoint" in self.user_params:
+ self.legacy_configure_ap_and_start()
+ wutils.wifi_toggle_state(self.dut, True)
+ asserts.assert_true(
+ len(self.open_network) > 0,
+ "Need at least one open network.")
+ self.open_network = self.open_network[0]["2g"]
+
+ def setup_test(self):
+ self.dut.droid.wakeLockAcquireBright()
+ self.dut.droid.wakeUpNow()
+
+ def teardown_test(self):
+ self.dut.droid.wakeLockRelease()
+ self.dut.droid.goToSleepNow()
+ wutils.reset_wifi(self.dut)
+
+
+ def on_fail(self, test_name, begin_time):
+ self.dut.take_bug_report(test_name, begin_time)
+ self.dut.cat_adb_log(test_name, begin_time)
+
+ def teardown_class(self):
+ if "AccessPoint" in self.user_params:
+ del self.user_params["open_network"]
+
+ """Tests"""
+
+ @test_tracker_info(uuid="770caebe-bcb1-43ac-95b6-5dd52dd90e80")
+ def test_ringbuffers_are_dumped_during_lsdebug(self):
+ """Steps:
+ 1. Connect to a open network.
+ 2. Delete old files under data/vendor/tombstones/wifi
+ 3. Call lshal debug on wifi hal component
+ 4. Verify that files are created under data/vender/tombstones/wifi
+ """
+ wutils.connect_to_wifi_network(self.dut, self.open_network)
+ time.sleep(DEFAULT_WAIT_TIME)
+ self.dut.adb.shell("rm data/vendor/tombstones/wifi/*")
+ try:
+ self.dut.adb.shell("lshal debug android.hardware.wifi@1.2::IWifi")
+ except UnicodeDecodeError:
+ """ Gets this error because adb.shell trys to parse the output to a string
+ but ringbuffer dumps should already be generated """
+ self.log.info("Unicode decode error occurred, but this is ok")
+ file_count_plus_one = self.dut.adb.shell("ls -l data/vendor/tombstones/wifi | wc -l")
+ if int(file_count_plus_one) <= 1:
+ raise signals.TestFailure("Failed to create ringbuffer debug files.")
\ No newline at end of file
diff --git a/acts/tests/google/wifi/WifiIOTTest.py b/acts/tests/google/wifi/WifiIOTTest.py
index 7a69229..1f3da05 100755
--- a/acts/tests/google/wifi/WifiIOTTest.py
+++ b/acts/tests/google/wifi/WifiIOTTest.py
@@ -37,7 +37,6 @@
"""
def __init__(self, controllers):
- self.attenuators = None
WifiBaseTest.__init__(self, controllers)
def setup_class(self):
diff --git a/acts/tests/google/wifi/WifiManagerTest.py b/acts/tests/google/wifi/WifiManagerTest.py
index 3c81053..5a5dfeb 100755
--- a/acts/tests/google/wifi/WifiManagerTest.py
+++ b/acts/tests/google/wifi/WifiManagerTest.py
@@ -20,7 +20,7 @@
import time
import acts.base_test
-import acts.signals
+import acts.signals as signals
import acts.test_utils.wifi.wifi_test_utils as wutils
import acts.utils
@@ -76,10 +76,12 @@
def setup_test(self):
self.dut.droid.wakeLockAcquireBright()
self.dut.droid.wakeUpNow()
+ wutils.wifi_toggle_state(self.dut, True)
def teardown_test(self):
self.dut.droid.wakeLockRelease()
self.dut.droid.goToSleepNow()
+ self.turn_location_off_and_scan_toggle_off()
wutils.reset_wifi(self.dut)
def teardown_class(self):
@@ -107,10 +109,8 @@
droid = ad.droid
ed = ad.ed
SSID = network[WifiEnums.SSID_KEY]
- ed.clear_all_events()
- wutils.start_wifi_connection_scan(ad)
- scan_results = droid.wifiGetScanResults()
- wutils.assert_network_in_list({WifiEnums.SSID_KEY: SSID}, scan_results)
+ wutils.start_wifi_connection_scan_and_ensure_network_found(
+ ad, SSID);
wutils.wifi_connect(ad, network, num_of_tries=3)
def get_connection_data(self, dut, network):
@@ -215,12 +215,8 @@
False otherwise.
"""
- self.dut.ed.clear_all_events()
- wutils.start_wifi_connection_scan(self.dut)
- scan_results = self.dut.droid.wifiGetScanResults()
- wutils.assert_network_in_list({
- WifiEnums.SSID_KEY: network_ssid
- }, scan_results)
+ wutils.start_wifi_connection_scan_and_ensure_network_found(
+ self.dut, network_ssid);
wutils.wifi_connect_by_id(self.dut, network_id)
connect_data = self.dut.droid.wifiGetConnectionInfo()
connect_ssid = connect_data[WifiEnums.SSID_KEY]
@@ -325,15 +321,188 @@
idle_time = new_idle_time
wutils.start_wifi_connection_scan(self.dut)
+ def turn_location_on_and_scan_toggle_on(self):
+ """ Turns on wifi location scans.
+ """
+ acts.utils.set_location_service(self.dut, True)
+ self.dut.droid.wifiScannerToggleAlwaysAvailable(True)
+ msg = "Failed to turn on location service's scan."
+ asserts.assert_true(self.dut.droid.wifiScannerIsAlwaysAvailable(), msg)
+
+ def turn_location_off_and_scan_toggle_off(self):
+ """ Turns off wifi location scans.
+ """
+ acts.utils.set_location_service(self.dut, False)
+ self.dut.droid.wifiScannerToggleAlwaysAvailable(False)
+ msg = "Failed to turn off location service's scan."
+ asserts.assert_true(not self.dut.droid.wifiScannerIsAlwaysAvailable(), msg)
+
+ def turn_location_on_and_scan_toggle_off(self):
+ """ Turns off wifi location scans, but keeps location on.
+ """
+ acts.utils.set_location_service(self.dut, True)
+ self.dut.droid.wifiScannerToggleAlwaysAvailable(False)
+ msg = "Failed to turn off location service's scan."
+ asserts.assert_true(not self.dut.droid.wifiScannerIsAlwaysAvailable(), msg)
+
+ def helper_reconnect_toggle_wifi(self):
+ """Connect to multiple networks, turn off/on wifi, then reconnect to
+ a previously connected network.
+
+ Steps:
+ 1. Connect to a 2GHz network.
+ 2. Connect to a 5GHz network.
+ 3. Turn WiFi OFF/ON.
+ 4. Reconnect to the non-current network.
+
+ """
+ connect_2g_data = self.get_connection_data(self.dut, self.wpapsk_2g)
+ connect_5g_data = self.get_connection_data(self.dut, self.wpapsk_5g)
+ wutils.toggle_wifi_off_and_on(self.dut)
+ reconnect_to = self.get_enabled_network(connect_2g_data,
+ connect_5g_data)
+ reconnect = self.connect_to_wifi_network_with_id(
+ reconnect_to[WifiEnums.NETID_KEY],
+ reconnect_to[WifiEnums.SSID_KEY])
+ if not reconnect:
+ raise signals.TestFailure("Device did not connect to the correct"
+ " network after toggling WiFi.")
+
+ def helper_reconnect_toggle_airplane(self):
+ """Connect to multiple networks, turn on/off Airplane moce, then
+ reconnect a previously connected network.
+
+ Steps:
+ 1. Connect to a 2GHz network.
+ 2. Connect to a 5GHz network.
+ 3. Turn ON/OFF Airplane mode.
+ 4. Reconnect to the non-current network.
+
+ """
+ connect_2g_data = self.get_connection_data(self.dut, self.wpapsk_2g)
+ connect_5g_data = self.get_connection_data(self.dut, self.wpapsk_5g)
+ wutils.toggle_airplane_mode_on_and_off(self.dut)
+ reconnect_to = self.get_enabled_network(connect_2g_data,
+ connect_5g_data)
+ reconnect = self.connect_to_wifi_network_with_id(
+ reconnect_to[WifiEnums.NETID_KEY],
+ reconnect_to[WifiEnums.SSID_KEY])
+ if not reconnect:
+ raise signals.TestFailure("Device did not connect to the correct"
+ " network after toggling Airplane mode.")
+
+ def helper_reboot_configstore_reconnect(self):
+ """Connect to multiple networks, reboot then reconnect to previously
+ connected network.
+
+ Steps:
+ 1. Connect to a 2GHz network.
+ 2. Connect to a 5GHz network.
+ 3. Reboot device.
+ 4. Verify all networks are persistent after reboot.
+ 5. Reconnect to the non-current network.
+
+ """
+ network_list = self.connect_multiple_networks(self.dut)
+ self.dut.reboot()
+ time.sleep(DEFAULT_TIMEOUT)
+ self.check_configstore_networks(network_list)
+
+ reconnect_to = self.get_enabled_network(network_list[BAND_2GHZ],
+ network_list[BAND_5GHZ])
+
+ reconnect = self.connect_to_wifi_network_with_id(
+ reconnect_to[WifiEnums.NETID_KEY],
+ reconnect_to[WifiEnums.SSID_KEY])
+ if not reconnect:
+ raise signals.TestFailure(
+ "Device failed to reconnect to the correct"
+ " network after reboot.")
+
+ def helper_toggle_wifi_reboot_configstore_reconnect(self):
+ """Connect to multiple networks, disable WiFi, reboot, then
+ reconnect to previously connected network.
+
+ Steps:
+ 1. Connect to a 2GHz network.
+ 2. Connect to a 5GHz network.
+ 3. Turn WiFi OFF.
+ 4. Reboot device.
+ 5. Turn WiFi ON.
+ 4. Verify all networks are persistent after reboot.
+ 5. Reconnect to the non-current network.
+
+ """
+ network_list = self.connect_multiple_networks(self.dut)
+ self.log.debug("Toggling wifi OFF")
+ wutils.wifi_toggle_state(self.dut, False)
+ time.sleep(DEFAULT_TIMEOUT)
+ self.dut.reboot()
+ time.sleep(DEFAULT_TIMEOUT)
+ self.log.debug("Toggling wifi ON")
+ wutils.wifi_toggle_state(self.dut, True)
+ time.sleep(DEFAULT_TIMEOUT)
+ self.check_configstore_networks(network_list)
+ reconnect_to = self.get_enabled_network(network_list[BAND_2GHZ],
+ network_list[BAND_5GHZ])
+ reconnect = self.connect_to_wifi_network_with_id(
+ reconnect_to[WifiEnums.NETID_KEY],
+ reconnect_to[WifiEnums.SSID_KEY])
+ if not reconnect:
+ msg = ("Device failed to reconnect to the correct network after"
+ " toggling WiFi and rebooting.")
+ raise signals.TestFailure(msg)
+
+ def helper_toggle_airplane_reboot_configstore_reconnect(self):
+ """Connect to multiple networks, enable Airplane mode, reboot, then
+ reconnect to previously connected network.
+
+ Steps:
+ 1. Connect to a 2GHz network.
+ 2. Connect to a 5GHz network.
+ 3. Toggle Airplane mode ON.
+ 4. Reboot device.
+ 5. Toggle Airplane mode OFF.
+ 4. Verify all networks are persistent after reboot.
+ 5. Reconnect to the non-current network.
+
+ """
+ network_list = self.connect_multiple_networks(self.dut)
+ self.log.debug("Toggling Airplane mode ON")
+ asserts.assert_true(
+ acts.utils.force_airplane_mode(self.dut, True),
+ "Can not turn on airplane mode on: %s" % self.dut.serial)
+ time.sleep(DEFAULT_TIMEOUT)
+ self.dut.reboot()
+ time.sleep(DEFAULT_TIMEOUT)
+ self.log.debug("Toggling Airplane mode OFF")
+ asserts.assert_true(
+ acts.utils.force_airplane_mode(self.dut, False),
+ "Can not turn on airplane mode on: %s" % self.dut.serial)
+ time.sleep(DEFAULT_TIMEOUT)
+ self.check_configstore_networks(network_list)
+ reconnect_to = self.get_enabled_network(network_list[BAND_2GHZ],
+ network_list[BAND_5GHZ])
+ reconnect = self.connect_to_wifi_network_with_id(
+ reconnect_to[WifiEnums.NETID_KEY],
+ reconnect_to[WifiEnums.SSID_KEY])
+ if not reconnect:
+ msg = ("Device failed to reconnect to the correct network after"
+ " toggling Airplane mode and rebooting.")
+ raise signals.TestFailure(msg)
+
"""Tests"""
@test_tracker_info(uuid="525fc5e3-afba-4bfd-9a02-5834119e3c66")
- def test_toggle_state(self):
+ def test_toggle_wifi_state_and_get_startupTime(self):
"""Test toggling wifi"""
self.log.debug("Going from on to off.")
wutils.wifi_toggle_state(self.dut, False)
self.log.debug("Going from off to on.")
+ startTime = time.time()
wutils.wifi_toggle_state(self.dut, True)
+ startup_time = time.time() - startTime
+ self.log.debug("WiFi was enabled on the device in %s s." % startup_time)
@test_tracker_info(uuid="e9d11563-2bbe-4c96-87eb-ec919b51435b")
def test_toggle_with_screen(self):
@@ -357,49 +526,28 @@
@test_tracker_info(uuid="71556e06-7fb1-4e2b-9338-b01f1f8e286e")
def test_scan(self):
"""Test wifi connection scan can start and find expected networks."""
- wutils.wifi_toggle_state(self.dut, True)
- self.log.debug("Start regular wifi scan.")
- wutils.start_wifi_connection_scan(self.dut)
- wifi_results = self.dut.droid.wifiGetScanResults()
- self.log.debug("Scan results: %s", wifi_results)
ssid = self.open_network[WifiEnums.SSID_KEY]
- wutils.assert_network_in_list({WifiEnums.SSID_KEY: ssid}, wifi_results)
+ wutils.start_wifi_connection_scan_and_ensure_network_found(
+ self.dut, ssid);
@test_tracker_info(uuid="3ea09efb-6921-429e-afb1-705ef5a09afa")
def test_scan_with_wifi_off_and_location_scan_on(self):
"""Put wifi in scan only mode"""
- acts.utils.set_location_service(self.dut, True)
- self.dut.droid.wifiScannerToggleAlwaysAvailable(True)
- msg = "Failed to turn on location service's scan."
- asserts.assert_true(self.dut.droid.wifiScannerIsAlwaysAvailable(), msg)
+ self.turn_location_on_and_scan_toggle_on()
wutils.wifi_toggle_state(self.dut, False)
"""Test wifi connection scan can start and find expected networks."""
- self.log.debug("Start regular wifi scan.")
- wutils.start_wifi_connection_scan(self.dut)
- wifi_results = self.dut.droid.wifiGetScanResults()
- self.log.debug("Scan results: %s", wifi_results)
ssid = self.open_network[WifiEnums.SSID_KEY]
- wutils.assert_network_in_list({WifiEnums.SSID_KEY: ssid}, wifi_results)
-
- """Turn off location scan and wifi on at the end of the test"""
- wutils.wifi_toggle_state(self.dut, True)
- self.dut.droid.wifiScannerToggleAlwaysAvailable(False)
- msg = "Failed to turn off location service's scan."
- asserts.assert_true(not self.dut.droid.wifiScannerIsAlwaysAvailable(), msg)
- acts.utils.set_location_service(self.dut, False)
+ wutils.start_wifi_connection_scan_and_ensure_network_found(
+ self.dut, ssid);
@test_tracker_info(uuid="770caebe-bcb1-43ac-95b6-5dd52dd90e80")
def test_scan_with_wifi_off_and_location_scan_off(self):
"""Turn off wifi and location scan"""
- acts.utils.set_location_service(self.dut, True)
- self.dut.droid.wifiScannerToggleAlwaysAvailable(False)
- msg = "Failed to turn off location service's scan."
- asserts.assert_true(not self.dut.droid.wifiScannerIsAlwaysAvailable(), msg)
+ self.turn_location_on_and_scan_toggle_off()
wutils.wifi_toggle_state(self.dut, False)
"""Test wifi connection scan should fail."""
- self.log.debug("Start regular wifi scan.")
self.dut.droid.wifiStartScan()
try:
self.dut.ed.pop_event("WifiManagerScanResultsAvailable", 60)
@@ -408,10 +556,6 @@
else:
asserts.fail("Wi-Fi scan results received")
- """Turn wifi on at the end of the test"""
- wutils.wifi_toggle_state(self.dut, True)
- acts.utils.set_location_service(self.dut, False)
-
@test_tracker_info(uuid="a4ad9930-a8fa-4868-81ed-a79c7483e502")
def test_add_network(self):
"""Test wifi connection scan."""
@@ -507,17 +651,23 @@
4. Reconnect to the non-current network.
"""
- connect_2g_data = self.get_connection_data(self.dut, self.wpapsk_2g)
- connect_5g_data = self.get_connection_data(self.dut, self.wpapsk_5g)
- wutils.toggle_wifi_off_and_on(self.dut)
- reconnect_to = self.get_enabled_network(connect_2g_data,
- connect_5g_data)
- reconnect = self.connect_to_wifi_network_with_id(
- reconnect_to[WifiEnums.NETID_KEY],
- reconnect_to[WifiEnums.SSID_KEY])
- if not reconnect:
- raise signals.TestFailure("Device did not connect to the correct"
- " network after toggling WiFi.")
+ self.helper_reconnect_toggle_wifi()
+
+ @test_tracker_info(uuid="bd2cec9e-7f17-44ef-8a0c-4da92a9b55ae")
+ def test_reconnect_toggle_wifi_with_location_scan_on(self):
+ """Connect to multiple networks, turn off/on wifi, then reconnect to
+ a previously connected network.
+
+ Steps:
+ 1. Turn on location scans.
+ 2. Connect to a 2GHz network.
+ 3. Connect to a 5GHz network.
+ 4. Turn WiFi OFF/ON.
+ 5. Reconnect to the non-current network.
+
+ """
+ self.turn_location_on_and_scan_toggle_on()
+ self.helper_reconnect_toggle_wifi()
@test_tracker_info(uuid="8e6e6c21-fefb-4fe8-9fb1-f09b1182b76d")
def test_reconnect_toggle_airplane(self):
@@ -531,17 +681,23 @@
4. Reconnect to the non-current network.
"""
- connect_2g_data = self.get_connection_data(self.dut, self.wpapsk_2g)
- connect_5g_data = self.get_connection_data(self.dut, self.wpapsk_5g)
- wutils.toggle_airplane_mode_on_and_off(self.dut)
- reconnect_to = self.get_enabled_network(connect_2g_data,
- connect_5g_data)
- reconnect = self.connect_to_wifi_network_with_id(
- reconnect_to[WifiEnums.NETID_KEY],
- reconnect_to[WifiEnums.SSID_KEY])
- if not reconnect:
- raise signals.TestFailure("Device did not connect to the correct"
- " network after toggling Airplane mode.")
+ self.helper_reconnect_toggle_airplane()
+
+ @test_tracker_info(uuid="28562f13-8a0a-492e-932c-e587560db5f2")
+ def test_reconnect_toggle_airplane_with_location_scan_on(self):
+ """Connect to multiple networks, turn on/off Airplane moce, then
+ reconnect a previously connected network.
+
+ Steps:
+ 1. Turn on location scans.
+ 2. Connect to a 2GHz network.
+ 3. Connect to a 5GHz network.
+ 4. Turn ON/OFF Airplane mode.
+ 5. Reconnect to the non-current network.
+
+ """
+ self.turn_location_on_and_scan_toggle_on()
+ self.helper_reconnect_toggle_airplane()
@test_tracker_info(uuid="3d041c12-05e2-46a7-ab9b-e3f60cc735db")
def test_reboot_configstore_reconnect(self):
@@ -556,21 +712,24 @@
5. Reconnect to the non-current network.
"""
- network_list = self.connect_multiple_networks(self.dut)
- self.dut.reboot()
- time.sleep(DEFAULT_TIMEOUT)
- self.check_configstore_networks(network_list)
+ self.helper_reboot_configstore_reconnect()
- reconnect_to = self.get_enabled_network(network_list[BAND_2GHZ],
- network_list[BAND_5GHZ])
+ @test_tracker_info(uuid="a70d5853-67b5-4d48-bdf7-08ee51fafd21")
+ def test_reboot_configstore_reconnect_with_location_scan_on(self):
+ """Connect to multiple networks, reboot then reconnect to previously
+ connected network.
- reconnect = self.connect_to_wifi_network_with_id(
- reconnect_to[WifiEnums.NETID_KEY],
- reconnect_to[WifiEnums.SSID_KEY])
- if not reconnect:
- raise signals.TestFailure(
- "Device failed to reconnect to the correct"
- " network after reboot.")
+ Steps:
+ 1. Turn on location scans.
+ 2. Connect to a 2GHz network.
+ 3. Connect to a 5GHz network.
+ 4. Reboot device.
+ 5. Verify all networks are persistent after reboot.
+ 6. Reconnect to the non-current network.
+
+ """
+ self.turn_location_on_and_scan_toggle_on()
+ self.helper_reboot_configstore_reconnect()
@test_tracker_info(uuid="26d94dfa-1349-4c8b-aea0-475eb73bb521")
def test_toggle_wifi_reboot_configstore_reconnect(self):
@@ -587,25 +746,26 @@
5. Reconnect to the non-current network.
"""
- network_list = self.connect_multiple_networks(self.dut)
- self.log.debug("Toggling wifi OFF")
- wutils.wifi_toggle_state(self.dut, False)
- time.sleep(DEFAULT_TIMEOUT)
- self.dut.reboot()
- time.sleep(DEFAULT_TIMEOUT)
- self.log.debug("Toggling wifi ON")
- wutils.wifi_toggle_state(self.dut, True)
- time.sleep(DEFAULT_TIMEOUT)
- self.check_configstore_networks(network_list)
- reconnect_to = self.get_enabled_network(network_list[BAND_2GHZ],
- network_list[BAND_5GHZ])
- reconnect = self.connect_to_wifi_network_with_id(
- reconnect_to[WifiEnums.NETID_KEY],
- reconnect_to[WifiEnums.SSID_KEY])
- if not reconnect:
- msg = ("Device failed to reconnect to the correct network after"
- " toggling WiFi and rebooting.")
- raise signals.TestFailure(msg)
+ self.helper_toggle_wifi_reboot_configstore_reconnect()
+
+ @test_tracker_info(uuid="7c004a3b-c1c6-4371-9124-0f34650be915")
+ def test_toggle_wifi_reboot_configstore_reconnect_with_location_scan_on(self):
+ """Connect to multiple networks, disable WiFi, reboot, then
+ reconnect to previously connected network.
+
+ Steps:
+ 1. Turn on location scans.
+ 2. Connect to a 2GHz network.
+ 3. Connect to a 5GHz network.
+ 4. Turn WiFi OFF.
+ 5. Reboot device.
+ 6. Turn WiFi ON.
+ 7. Verify all networks are persistent after reboot.
+ 8. Reconnect to the non-current network.
+
+ """
+ self.turn_location_on_and_scan_toggle_on()
+ self.helper_toggle_wifi_reboot_configstore_reconnect()
@test_tracker_info(uuid="4fce017b-b443-40dc-a598-51d59d3bb38f")
def test_toggle_airplane_reboot_configstore_reconnect(self):
@@ -622,29 +782,26 @@
5. Reconnect to the non-current network.
"""
- network_list = self.connect_multiple_networks(self.dut)
- self.log.debug("Toggling Airplane mode ON")
- asserts.assert_true(
- acts.utils.force_airplane_mode(self.dut, True),
- "Can not turn on airplane mode on: %s" % self.dut.serial)
- time.sleep(DEFAULT_TIMEOUT)
- self.dut.reboot()
- time.sleep(DEFAULT_TIMEOUT)
- self.log.debug("Toggling Airplane mode OFF")
- asserts.assert_true(
- acts.utils.force_airplane_mode(self.dut, False),
- "Can not turn on airplane mode on: %s" % self.dut.serial)
- time.sleep(DEFAULT_TIMEOUT)
- self.check_configstore_networks(network_list)
- reconnect_to = self.get_enabled_network(network_list[BAND_2GHZ],
- network_list[BAND_5GHZ])
- reconnect = self.connect_to_wifi_network_with_id(
- reconnect_to[WifiEnums.NETID_KEY],
- reconnect_to[WifiEnums.SSID_KEY])
- if not reconnect:
- msg = ("Device failed to reconnect to the correct network after"
- " toggling Airplane mode and rebooting.")
- raise signals.TestFailure(msg)
+ self.helper_toggle_airplane_reboot_configstore_reconnect()
+
+ @test_tracker_info(uuid="7f0810f9-2338-4158-95f5-057f5a1905b6")
+ def test_toggle_airplane_reboot_configstore_reconnect_with_location_scan_on(self):
+ """Connect to multiple networks, enable Airplane mode, reboot, then
+ reconnect to previously connected network.
+
+ Steps:
+ 1. Turn on location scans.
+ 2. Connect to a 2GHz network.
+ 3. Connect to a 5GHz network.
+ 4. Toggle Airplane mode ON.
+ 5. Reboot device.
+ 6. Toggle Airplane mode OFF.
+ 7. Verify all networks are persistent after reboot.
+ 8. Reconnect to the non-current network.
+
+ """
+ self.turn_location_on_and_scan_toggle_on()
+ self.helper_toggle_airplane_reboot_configstore_reconnect()
@test_tracker_info(uuid="81eb7527-4c92-4422-897a-6b5f6445e84a")
def test_config_store_with_wpapsk_2g(self):
diff --git a/acts/tests/google/wifi/WifiScannerMultiScanTest.py b/acts/tests/google/wifi/WifiScannerMultiScanTest.py
index 0ff3574..1b33e57 100755
--- a/acts/tests/google/wifi/WifiScannerMultiScanTest.py
+++ b/acts/tests/google/wifi/WifiScannerMultiScanTest.py
@@ -149,15 +149,12 @@
'numUsage': 0,
'SSID': '"wh_ap1_2g"',
'timestamp': 4280078660,
- 'numConnection': 0,
'BSSID': '30:b5:c2:33:f9:05',
'frequency': 2412,
- 'numIpConfigFailures': 0,
'distanceSdCm': 0,
'distanceCm': 0,
'centerFreq1': 0,
'centerFreq0': 0,
- 'blackListTimestamp': 0,
'venueName': '',
'seen': 0,
'operatorFriendlyName': '',
diff --git a/acts/tests/google/wifi/WifiSoftApTest.py b/acts/tests/google/wifi/WifiSoftApTest.py
index ed8a080..6b8e83e 100644
--- a/acts/tests/google/wifi/WifiSoftApTest.py
+++ b/acts/tests/google/wifi/WifiSoftApTest.py
@@ -48,9 +48,8 @@
utils.sync_device_time(self.dut)
utils.sync_device_time(self.dut_client)
# Set country code explicitly to "US".
- self.dut.adb.shell("halutil -country %s" %
- wutils.WifiEnums.CountryCode.US)
-
+ self.dut.droid.wifiSetCountryCode(wutils.WifiEnums.CountryCode.US)
+ self.dut_client.droid.wifiSetCountryCode(wutils.WifiEnums.CountryCode.US)
# Enable verbose logging on the duts
self.dut.droid.wifiEnableVerboseLogging(1)
asserts.assert_equal(self.dut.droid.wifiGetVerboseLoggingLevel(), 1,
@@ -60,11 +59,13 @@
"Failed to enable WiFi verbose logging on the client dut.")
def teardown_class(self):
+ wutils.stop_wifi_tethering(self.dut)
wutils.reset_wifi(self.dut)
wutils.reset_wifi(self.dut_client)
def on_fail(self, test_name, begin_time):
self.dut.take_bug_report(test_name, begin_time)
+ self.dut_client.take_bug_report(test_name, begin_time)
""" Helper Functions """
def verify_return_to_wifi_enabled(self):
@@ -119,15 +120,17 @@
Args:
ap_ssid: SSID of the ap we are looking for.
"""
- #TODO(silberst): debug and remove the extra scan before submitting this test
- wutils.start_wifi_connection_scan(self.dut_client)
- client_scan_results = self.dut_client.droid.wifiGetScanResults()
- wutils.start_wifi_connection_scan(self.dut_client)
- client_scan_results = self.dut_client.droid.wifiGetScanResults()
- for result in client_scan_results:
- self.dut.log.debug("scan found: %s", result[wutils.WifiEnums.SSID_KEY])
- wutils.assert_network_in_list({wutils.WifiEnums.SSID_KEY: ap_ssid},
- client_scan_results)
+ wutils.start_wifi_connection_scan_and_ensure_network_found(
+ self.dut_client, ap_ssid);
+
+ def confirm_softap_not_in_scan_results(self, ap_ssid):
+ """Confirm the ap started by wifi tethering is not seen in scan results.
+
+ Args:
+ ap_ssid: SSID of the ap we are looking for.
+ """
+ wutils.start_wifi_connection_scan_and_ensure_network_not_found(
+ self.dut_client, ap_ssid);
def check_cell_data_and_enable(self):
"""Make sure that cell data is enabled if there is a sim present.
@@ -144,7 +147,7 @@
asserts.assert_true(self.dut.droid.telephonyIsDataEnabled(),
"Failed to enable cell data for softap dut.")
- def validate_full_tether_startup(self, band=None):
+ def validate_full_tether_startup(self, band=None, hidden=None):
"""Test full startup of wifi tethering
1. Report current state.
@@ -162,7 +165,17 @@
config = self.create_softap_config()
wutils.start_wifi_tethering(self.dut,
config[wutils.WifiEnums.SSID_KEY],
- config[wutils.WifiEnums.PWD_KEY], band)
+ config[wutils.WifiEnums.PWD_KEY], band, hidden)
+ if hidden:
+ # First ensure it's not seen in scan results.
+ self.confirm_softap_not_in_scan_results(
+ config[wutils.WifiEnums.SSID_KEY])
+ # If the network is hidden, it should be saved on the client to be
+ # seen in scan results.
+ config[wutils.WifiEnums.HIDDEN_KEY] = True
+ ret = self.dut_client.droid.wifiAddNetwork(config)
+ asserts.assert_true(ret != -1, "Add network %r failed" % config)
+ self.dut_client.droid.wifiEnableNetwork(ret, 0)
self.confirm_softap_in_scan_results(config[wutils.WifiEnums.SSID_KEY])
wutils.stop_wifi_tethering(self.dut)
asserts.assert_false(self.dut.droid.wifiIsApEnabled(),
@@ -228,6 +241,30 @@
"""
self.validate_full_tether_startup(WIFI_CONFIG_APBAND_5G)
+ @test_tracker_info(uuid="d26ee4df-5dcb-4191-829f-05a10b1218a7")
+ def test_full_tether_startup_2G_hidden(self):
+ """Test full startup of wifi tethering in 2G band using hidden AP.
+
+ 1. Report current state.
+ 2. Switch to AP mode.
+ 3. verify SoftAP active.
+ 4. Shutdown wifi tethering.
+ 5. verify back to previous mode.
+ """
+ self.validate_full_tether_startup(WIFI_CONFIG_APBAND_2G, True)
+
+ @test_tracker_info(uuid="229cd585-a789-4c9a-8948-89fa72de9dd5")
+ def test_full_tether_startup_5G_hidden(self):
+ """Test full startup of wifi tethering in 5G band using hidden AP.
+
+ 1. Report current state.
+ 2. Switch to AP mode.
+ 3. verify SoftAP active.
+ 4. Shutdown wifi tethering.
+ 5. verify back to previous mode.
+ """
+ self.validate_full_tether_startup(WIFI_CONFIG_APBAND_5G, True)
+
""" Tests End """
diff --git a/acts/tests/google/wifi/WifiStaApConcurrencyTest.py b/acts/tests/google/wifi/WifiStaApConcurrencyTest.py
new file mode 100755
index 0000000..916519d
--- /dev/null
+++ b/acts/tests/google/wifi/WifiStaApConcurrencyTest.py
@@ -0,0 +1,322 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2018 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import itertools
+import pprint
+import queue
+import time
+
+import acts.base_test
+import acts.signals as signals
+from acts.test_utils.tel.tel_test_utils import WIFI_CONFIG_APBAND_2G
+from acts.test_utils.tel.tel_test_utils import WIFI_CONFIG_APBAND_5G
+import acts.test_utils.wifi.wifi_test_utils as wutils
+import acts.utils as utils
+
+from acts import asserts
+from acts.test_decorators import test_tracker_info
+from acts.test_utils.wifi.WifiBaseTest import WifiBaseTest
+
+WifiEnums = wutils.WifiEnums
+
+class WifiStaApConcurrencyTest(WifiBaseTest):
+ """Tests for STA + AP concurrency scenarions.
+
+ Test Bed Requirement:
+ * Two Android devices (For AP)
+ * One Wi-Fi network visible to the device (for STA).
+ """
+
+ def __init__(self, controllers):
+ WifiBaseTest.__init__(self, controllers)
+
+ def setup_class(self):
+ self.dut = self.android_devices[0]
+ self.dut_client = self.android_devices[1]
+ wutils.wifi_test_device_init(self.dut)
+ wutils.wifi_test_device_init(self.dut_client)
+ # Do a simple version of init - mainly just sync the time and enable
+ # verbose logging. This test will fail if the DUT has a sim and cell
+ # data is disabled. We would also like to test with phones in less
+ # constrained states (or add variations where we specifically
+ # constrain).
+ utils.require_sl4a((self.dut, self.dut_client))
+ utils.sync_device_time(self.dut)
+ utils.sync_device_time(self.dut_client)
+ # Set country code explicitly to "US".
+ self.dut.droid.wifiSetCountryCode(wutils.WifiEnums.CountryCode.US)
+ self.dut_client.droid.wifiSetCountryCode(wutils.WifiEnums.CountryCode.US)
+ # Enable verbose logging on the duts
+ self.dut.droid.wifiEnableVerboseLogging(1)
+ asserts.assert_equal(self.dut.droid.wifiGetVerboseLoggingLevel(), 1,
+ "Failed to enable WiFi verbose logging on the softap dut.")
+ self.dut_client.droid.wifiEnableVerboseLogging(1)
+ asserts.assert_equal(self.dut_client.droid.wifiGetVerboseLoggingLevel(), 1,
+ "Failed to enable WiFi verbose logging on the client dut.")
+
+ req_params = ["reference_networks"]
+ opt_param = ["iperf_server_address"]
+ self.unpack_userparams(
+ req_param_names=req_params, opt_param_names=opt_param)
+
+ if "AccessPoint" in self.user_params:
+ self.legacy_configure_ap_and_start()
+
+ asserts.assert_true(
+ len(self.reference_networks) >= 1,
+ "Need at least 1 reference network with psk.")
+ asserts.assert_true(
+ self.reference_networks[0]["2g"],
+ "Need at least 1 2.4Ghz reference network with psk.")
+ asserts.assert_true(
+ self.reference_networks[0]["5g"],
+ "Need at least 1 5Ghz reference network with psk.")
+ if "iperf_server_address" in self.user_params:
+ self.iperf_server = self.iperf_servers[0]
+ self.wpapsk_2g = self.reference_networks[0]["2g"]
+ self.wpapsk_5g = self.reference_networks[0]["5g"]
+ if hasattr(self, 'iperf_server'):
+ self.iperf_server.start()
+
+ def setup_test(self):
+ self.dut.droid.wakeLockAcquireBright()
+ self.dut.droid.wakeUpNow()
+ self.turn_location_off_and_scan_toggle_off()
+ wutils.wifi_toggle_state(self.dut, False)
+ wutils.wifi_toggle_state(self.dut_client, False)
+
+ def teardown_test(self):
+ self.dut.droid.wakeLockRelease()
+ self.dut.droid.goToSleepNow()
+ wutils.stop_wifi_tethering(self.dut)
+ wutils.reset_wifi(self.dut)
+ wutils.reset_wifi(self.dut_client)
+
+ def teardown_class(self):
+ if hasattr(self, 'iperf_server'):
+ self.iperf_server.stop()
+
+ def on_fail(self, test_name, begin_time):
+ self.dut.take_bug_report(test_name, begin_time)
+ self.dut.cat_adb_log(test_name, begin_time)
+
+ def teardown_class(self):
+ if "AccessPoint" in self.user_params:
+ del self.user_params["reference_networks"]
+ del self.user_params["open_network"]
+
+ """Helper Functions"""
+ def turn_location_on_and_scan_toggle_on(self):
+ """ Turns on wifi location scans.
+ """
+ acts.utils.set_location_service(self.dut, True)
+ self.dut.droid.wifiScannerToggleAlwaysAvailable(True)
+ msg = "Failed to turn on location service's scan."
+ asserts.assert_true(self.dut.droid.wifiScannerIsAlwaysAvailable(), msg)
+
+ def turn_location_off_and_scan_toggle_off(self):
+ """ Turns off wifi location scans.
+ """
+ acts.utils.set_location_service(self.dut, False)
+ self.dut.droid.wifiScannerToggleAlwaysAvailable(False)
+ msg = "Failed to turn off location service's scan."
+ asserts.assert_true(not self.dut.droid.wifiScannerIsAlwaysAvailable(), msg)
+
+ def run_iperf_client(self, params):
+ """Run iperf traffic after connection.
+
+ Args:
+ params: A tuple of network info and AndroidDevice object.
+ """
+ if "iperf_server_address" in self.user_params:
+ wait_time = 5
+ network, ad = params
+ SSID = network[WifiEnums.SSID_KEY]
+ self.log.info("Starting iperf traffic through {}".format(SSID))
+ time.sleep(wait_time)
+ port_arg = "-p {}".format(self.iperf_server.port)
+ success, data = ad.run_iperf_client(self.iperf_server_address,
+ port_arg)
+ self.log.debug(pprint.pformat(data))
+ asserts.assert_true(success, "Error occurred in iPerf traffic.")
+
+ def connect_to_wifi_network_and_verify(self, params):
+ """Connection logic for open and psk wifi networks.
+
+ Args:
+ params: A tuple of network info and AndroidDevice object.
+ """
+ network, ad = params
+ droid = ad.droid
+ ed = ad.ed
+ SSID = network[WifiEnums.SSID_KEY]
+ wutils.start_wifi_connection_scan_and_ensure_network_found(
+ ad, SSID);
+ wutils.wifi_connect(ad, network, num_of_tries=3)
+
+ def confirm_softap_in_scan_results(self, ap_ssid):
+ """Confirm the ap started by wifi tethering is seen in scan results.
+
+ Args:
+ ap_ssid: SSID of the ap we are looking for.
+ """
+ wutils.start_wifi_connection_scan_and_ensure_network_found(
+ self.dut_client, ap_ssid);
+
+ def create_softap_config(self):
+ """Create a softap config with ssid and password."""
+ ap_ssid = "softap_" + utils.rand_ascii_str(8)
+ ap_password = utils.rand_ascii_str(8)
+ self.dut.log.info("softap setup: %s %s", ap_ssid, ap_password)
+ config = {wutils.WifiEnums.SSID_KEY: ap_ssid}
+ config[wutils.WifiEnums.PWD_KEY] = ap_password
+ return config
+
+ def start_softap_and_verify(self, band):
+ """Test startup of softap
+
+ 1. Brinup AP mode.
+ 2. Verify SoftAP active using the client device.
+ """
+ config = self.create_softap_config()
+ wutils.start_wifi_tethering(self.dut,
+ config[wutils.WifiEnums.SSID_KEY],
+ config[wutils.WifiEnums.PWD_KEY], band)
+ wutils.wifi_toggle_state(self.dut_client, True)
+ self.confirm_softap_in_scan_results(config[wutils.WifiEnums.SSID_KEY])
+
+ def connect_to_wifi_network_and_start_softap(self, nw_params, softap_band):
+ """Test concurrenct wifi connection and softap.
+ This helper method first makes a wifi conenction and then starts SoftAp.
+
+ Args:
+ nw_params: Params for network STA connection.
+ softap_band: Band for the AP.
+
+ 1. Bring up wifi.
+ 2. Establish connection to a network.
+ 3. Bring up softap and verify AP is seen on a client device.
+ 4. Run iperf on the wifi connection to the network.
+ """
+ wutils.wifi_toggle_state(self.dut, True)
+ self.connect_to_wifi_network_and_verify((nw_params, self.dut))
+ self.start_softap_and_verify(softap_band)
+ self.run_iperf_client((nw_params, self.dut))
+ # Verify that both softap & wifi is enabled concurrently.
+ self.verify_wifi_and_softap_enabled()
+
+ def start_softap_and_connect_to_wifi_network(self, nw_params, softap_band):
+ """Test concurrenct wifi connection and softap.
+ This helper method first starts SoftAp and then makes a wifi conenction.
+
+ Args:
+ nw_params: Params for network STA connection.
+ softap_band: Band for the AP.
+
+ 1. Bring up softap and verify AP is seen on a client device.
+ 2. Bring up wifi.
+ 3. Establish connection to a network.
+ 4. Run iperf on the wifi connection to the network.
+ """
+ self.start_softap_and_verify(softap_band)
+ wutils.wifi_toggle_state(self.dut, True)
+ self.connect_to_wifi_network_and_verify((nw_params, self.dut))
+ self.run_iperf_client((nw_params, self.dut))
+ # Verify that both softap & wifi is enabled concurrently.
+ self.verify_wifi_and_softap_enabled()
+
+ def verify_wifi_and_softap_enabled(self):
+ """Helper to verify both wifi and softap is enabled
+ """
+ asserts.assert_true(self.dut.droid.wifiCheckState(),
+ "Wifi is not reported as running");
+ asserts.assert_false(self.dut.droid.wifiIsApEnabled(),
+ "SoftAp is not reported as running")
+
+ """Tests"""
+ @test_tracker_info(uuid="")
+ def test_wifi_connection_2G_softap_2G(self):
+ """Tests connection to 2G network followed by bringing up SoftAp on 2G.
+ """
+ self.connect_to_wifi_network_and_start_softap(
+ self.wpapsk_2g, WIFI_CONFIG_APBAND_2G)
+
+ @test_tracker_info(uuid="")
+ def test_wifi_connection_5G_softap_5G(self):
+ """Tests connection to 5G network followed by bringing up SoftAp on 5G.
+ """
+ self.connect_to_wifi_network_and_start_softap(
+ self.wpapsk_5g, WIFI_CONFIG_APBAND_5G)
+
+ @test_tracker_info(uuid="")
+ def test_wifi_connection_5G_softap_2G(self):
+ """Tests connection to 5G network followed by bringing up SoftAp on 2G.
+ """
+ self.connect_to_wifi_network_and_start_softap(
+ self.wpapsk_5g, WIFI_CONFIG_APBAND_2G)
+
+ @test_tracker_info(uuid="")
+ def test_wifi_connection_2G_softap_5G(self):
+ """Tests connection to 2G network followed by bringing up SoftAp on 5G.
+ """
+ self.connect_to_wifi_network_and_start_softap(
+ self.wpapsk_2g, WIFI_CONFIG_APBAND_5G)
+
+ @test_tracker_info(uuid="")
+ def test_wifi_connection_5G_softap_2G_with_location_scan_on(self):
+ """Tests connection to 5G network followed by bringing up SoftAp on 2G
+ with location scans turned on.
+ """
+ self.turn_location_on_and_scan_toggle_on()
+ self.connect_to_wifi_network_and_start_softap(
+ self.wpapsk_5g, WIFI_CONFIG_APBAND_2G)
+
+ @test_tracker_info(uuid="")
+ def test_softap_2G_wifi_connection_2G(self):
+ """Tests bringing up SoftAp on 2G followed by connection to 2G network.
+ """
+ self.start_softap_and_connect_to_wifi_network(
+ self.wpapsk_2g, WIFI_CONFIG_APBAND_2G)
+
+ @test_tracker_info(uuid="")
+ def test_softap_5G_wifi_connection_5G(self):
+ """Tests bringing up SoftAp on 5G followed by connection to 5G network.
+ """
+ self.start_softap_and_connect_to_wifi_network(
+ self.wpapsk_5g, WIFI_CONFIG_APBAND_5G)
+
+ @test_tracker_info(uuid="")
+ def test_softap_5G_wifi_connection_2G(self):
+ """Tests bringing up SoftAp on 5G followed by connection to 2G network.
+ """
+ self.start_softap_and_connect_to_wifi_network(
+ self.wpapsk_5g, WIFI_CONFIG_APBAND_2G)
+
+ @test_tracker_info(uuid="")
+ def test_softap_2G_wifi_connection_5G(self):
+ """Tests bringing up SoftAp on 2G followed by connection to 5G network.
+ """
+ self.start_softap_and_connect_to_wifi_network(
+ self.wpapsk_2g, WIFI_CONFIG_APBAND_5G)
+
+ @test_tracker_info(uuid="")
+ def test_softap_5G_wifi_connection_2G_with_location_scan_on(self):
+ """Tests bringing up SoftAp on 5G followed by connection to 2G network
+ with location scans turned on.
+ """
+ self.turn_location_on_and_scan_toggle_on()
+ self.start_softap_and_connect_to_wifi_network(
+ self.wpapsk_5g, WIFI_CONFIG_APBAND_2G)
diff --git a/acts/tests/google/wifi/WifiStressTest.py b/acts/tests/google/wifi/WifiStressTest.py
new file mode 100755
index 0000000..bb574fb
--- /dev/null
+++ b/acts/tests/google/wifi/WifiStressTest.py
@@ -0,0 +1,263 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2018 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pprint
+import time
+
+import acts.base_test
+import acts.test_utils.wifi.wifi_test_utils as wutils
+import acts.utils
+
+from acts import asserts
+from acts import signals
+from acts import utils
+from acts.test_decorators import test_tracker_info
+from acts.test_utils.wifi.WifiBaseTest import WifiBaseTest
+WifiEnums = wutils.WifiEnums
+
+WAIT_FOR_AUTO_CONNECT = 40
+WAIT_BEFORE_CONNECTION = 30
+
+TIMEOUT = 1
+
+
+class WifiStressTest(WifiBaseTest):
+ """WiFi Stress test class.
+
+ Test Bed Requirement:
+ * Two Android device
+ * Several Wi-Fi networks visible to the device, including an open Wi-Fi
+ network.
+ """
+
+ def __init__(self, controllers):
+ WifiBaseTest.__init__(self, controllers)
+
+ def setup_class(self):
+ self.dut = self.android_devices[0]
+ self.dut_client = self.android_devices[1]
+ wutils.wifi_test_device_init(self.dut)
+ req_params = []
+ opt_param = [
+ "open_network", "reference_networks", "iperf_server_address",
+ "stress_count"]
+ self.unpack_userparams(
+ req_param_names=req_params, opt_param_names=opt_param)
+
+ if "AccessPoint" in self.user_params:
+ self.legacy_configure_ap_and_start()
+
+ asserts.assert_true(
+ len(self.reference_networks) > 0,
+ "Need at least one reference network with psk.")
+ self.wpa_2g = self.reference_networks[0]["2g"]
+ self.wpa_5g = self.reference_networks[0]["5g"]
+ self.open_2g = self.open_network[0]["2g"]
+ self.open_5g = self.open_network[0]["5g"]
+ self.networks = [self.wpa_2g, self.wpa_5g, self.open_2g, self.open_5g]
+ if "iperf_server_address" in self.user_params:
+ self.iperf_server = self.iperf_servers[0]
+ if hasattr(self, 'iperf_server'):
+ self.iperf_server.start()
+
+ def setup_test(self):
+ self.dut.droid.wakeLockAcquireBright()
+ self.dut.droid.wakeUpNow()
+
+ def teardown_test(self):
+ self.dut.droid.wakeLockRelease()
+ self.dut.droid.goToSleepNow()
+
+ def on_fail(self, test_name, begin_time):
+ self.dut.take_bug_report(test_name, begin_time)
+ self.dut.cat_adb_log(test_name, begin_time)
+ pass
+
+ def teardown_class(self):
+ wutils.reset_wifi(self.dut)
+ if hasattr(self, 'iperf_server'):
+ self.iperf_server.stop()
+ if "AccessPoint" in self.user_params:
+ del self.user_params["reference_networks"]
+ del self.user_params["open_network"]
+
+ """Helper Functions"""
+
+ def scan_and_connect_by_ssid(self, network):
+ """Scan for network and connect using network information.
+
+ Args:
+ network: A dictionary representing the network to connect to.
+
+ """
+ ssid = network[WifiEnums.SSID_KEY]
+ wutils.start_wifi_connection_scan_and_ensure_network_found(self.dut,
+ ssid)
+ wutils.wifi_connect(self.dut, network, num_of_tries=3)
+
+ def scan_and_connect_by_id(self, network, net_id):
+ """Scan for network and connect using network id.
+
+ Args:
+ net_id: Integer specifying the network id of the network.
+
+ """
+ ssid = network[WifiEnums.SSID_KEY]
+ wutils.start_wifi_connection_scan_and_ensure_network_found(self.dut,
+ ssid)
+ wutils.wifi_connect_by_id(self.dut, net_id)
+
+
+ """Tests"""
+
+ @test_tracker_info(uuid="")
+ def test_stress_toggle_wifi_state(self):
+ """Toggle WiFi state ON and OFF for N times."""
+ for count in range(self.stress_count):
+ """Test toggling wifi"""
+ self.log.debug("Going from on to off.")
+ wutils.wifi_toggle_state(self.dut, False)
+ self.log.debug("Going from off to on.")
+ startTime = time.time()
+ wutils.wifi_toggle_state(self.dut, True)
+ startup_time = time.time() - startTime
+ self.log.debug("WiFi was enabled on the device in %s s." % startup_time)
+
+ @test_tracker_info(uuid="")
+ def test_stress_connect_traffic_disconnect_5g(self):
+ """Test to connect and disconnect from a network for N times.
+
+ Steps:
+ 1. Scan and connect to a network.
+ 2. Run IPerf to upload data for few seconds.
+ 3. Disconnect.
+ 4. Repeat 1-3.
+
+ """
+ net_id = self.dut.droid.wifiAddNetwork(self.wpa_5g)
+ asserts.assert_true(net_id != -1, "Add network %r failed" % self.wpa_5g)
+ self.dut.droid.wifiEnableNetwork(net_id, 0)
+ for count in range(self.stress_count):
+ self.scan_and_connect_by_id(self.wpa_5g, net_id)
+ # Start IPerf traffic from phone to server.
+ # Upload data for 10s.
+ args = "-p {} -t {}".format(self.iperf_server.port, 10)
+ self.log.info("Running iperf client {}".format(args))
+ result, data = self.dut.run_iperf_client(self.iperf_server_address, args)
+ self.dut.droid.wifiDisconnect()
+ time.sleep(WAIT_BEFORE_CONNECTION)
+ if not result:
+ self.log.debug("Error occurred in iPerf traffic.")
+ raise signals.TestFailure("Error occurred in iPerf traffic. Current"
+ " WiFi state = %d" % self.dut.droid.wifiCheckState())
+
+ @test_tracker_info(uuid="")
+ def test_stress_connect_long_traffic_5g(self):
+ """Test to connect to network and hold connection for few hours.
+
+ Steps:
+ 1. Scan and connect to a network.
+ 2. Run IPerf to download data for few hours.
+ 3. Verify no WiFi disconnects/data interruption.
+
+ """
+ self.scan_and_connect_by_ssid(self.wpa_5g)
+ # Start IPerf traffic from server to phone.
+ # Download data for 5 hours.
+ sec = 5*60*60
+ args = "-p {} -t {} -R".format(self.iperf_server.port, sec)
+ self.log.info("Running iperf client {}".format(args))
+ result, data = self.dut.run_iperf_client(self.iperf_server_address,
+ args, timeout=sec+1)
+ self.dut.droid.wifiDisconnect()
+ if not result:
+ self.log.debug("Error occurred in iPerf traffic.")
+ raise signals.TestFailure("Error occurred in iPerf traffic. Current"
+ " WiFi state = %d" % self.dut.droid.wifiCheckState())
+
+ @test_tracker_info(uuid="")
+ def test_stress_wifi_failover(self):
+ """This test does aggressive failover to several networks in list.
+
+ Steps:
+ 1. Add and enable few networks.
+ 2. Let device auto-connect.
+ 3. Remove the connected network.
+ 4. Repeat 2-3.
+ 5. Device should connect to a network until all networks are
+ exhausted.
+
+ """
+ for count in range(self.stress_count):
+ ssids = list()
+ for network in self.networks:
+ ssids.append(network[WifiEnums.SSID_KEY])
+ ret = self.dut.droid.wifiAddNetwork(network)
+ asserts.assert_true(ret != -1, "Add network %r failed" % network)
+ self.dut.droid.wifiEnableNetwork(ret, 0)
+ time.sleep(WAIT_FOR_AUTO_CONNECT)
+ cur_network = self.dut.droid.wifiGetConnectionInfo()
+ cur_ssid = cur_network[WifiEnums.SSID_KEY]
+ for count in range(0,len(self.networks)):
+ self.log.debug("Forget network %s" % cur_ssid)
+ wutils.wifi_forget_network(self.dut, cur_ssid)
+ time.sleep(WAIT_FOR_AUTO_CONNECT)
+ cur_network = self.dut.droid.wifiGetConnectionInfo()
+ cur_ssid = cur_network[WifiEnums.SSID_KEY]
+ if count == len(self.networks) - 1:
+ break
+ if cur_ssid not in ssids:
+ raise signals.TestFailure("Device did not failover to the "
+ "expected network. SSID = %s" % cur_ssid)
+ network_config = self.dut.droid.wifiGetConfiguredNetworks()
+ if len(network_config):
+ raise signals.TestFailure("All the network configurations were not "
+ "removed. Configured networks = %s" % network_config)
+
+ @test_tracker_info(uuid="")
+ def test_stress_softAP_startup_and_stop_5g(self):
+ """Test to bring up softAP and down for N times.
+
+ Steps:
+ 1. Bring up softAP on 5G.
+ 2. Check for softAP on teh client device.
+ 3. Turn ON WiFi.
+ 4. Verify softAP is turned down and WiFi is up.
+
+ """
+ ap_ssid = "softap_" + utils.rand_ascii_str(8)
+ ap_password = utils.rand_ascii_str(8)
+ self.dut.log.info("softap setup: %s %s", ap_ssid, ap_password)
+ config = {wutils.WifiEnums.SSID_KEY: ap_ssid}
+ config[wutils.WifiEnums.PWD_KEY] = ap_password
+ for count in range(self.stress_count):
+ initial_wifi_state = self.dut.droid.wifiCheckState()
+ wutils.start_wifi_tethering(self.dut,
+ ap_ssid,
+ ap_password,
+ WifiEnums.WIFI_CONFIG_APBAND_5G)
+ wutils.start_wifi_connection_scan_and_ensure_network_found(
+ self.dut_client, ap_ssid)
+ # Toggle WiFi ON, which inturn calls softAP teardown.
+ wutils.wifi_toggle_state(self.dut, True)
+ time.sleep(TIMEOUT)
+ asserts.assert_false(self.dut.droid.wifiIsApEnabled(),
+ "SoftAp failed to shutdown!")
+ time.sleep(TIMEOUT)
+ cur_wifi_state = self.dut.droid.wifiCheckState()
+ if initial_wifi_state != cur_wifi_state:
+ raise signals.TestFailure("Wifi state was %d before softAP and %d now!" %
+ (initial_wifi_state, cur_wifi_state))
diff --git a/acts/tests/google/wifi/aware/functional/AttachTest.py b/acts/tests/google/wifi/aware/functional/AttachTest.py
index 598cca6..a1a420b 100644
--- a/acts/tests/google/wifi/aware/functional/AttachTest.py
+++ b/acts/tests/google/wifi/aware/functional/AttachTest.py
@@ -16,12 +16,13 @@
import time
+from acts import asserts
+from acts import utils
from acts.test_decorators import test_tracker_info
from acts.test_utils.wifi import wifi_test_utils as wutils
from acts.test_utils.wifi.aware import aware_const as aconsts
from acts.test_utils.wifi.aware import aware_test_utils as autils
from acts.test_utils.wifi.aware.AwareBaseTest import AwareBaseTest
-from acts.utils import force_airplane_mode
class AttachTest(AwareBaseTest):
@@ -99,8 +100,8 @@
"""Function test case / Attach test cases / attempt to attach with wifi off
Validates that if trying to attach with Wi-Fi disabled will receive the
- expected failure callback. As a side-effect also validates that the broadcast
- for Aware unavailable is received.
+ expected failure callback. As a side-effect also validates that the
+ broadcast for Aware unavailable is received.
"""
dut = self.android_devices[0]
wutils.wifi_toggle_state(dut, False)
@@ -108,6 +109,37 @@
dut.droid.wifiAwareAttach()
autils.wait_for_event(dut, aconsts.EVENT_CB_ON_ATTACH_FAILED)
+ def test_attach_with_doze(self):
+ """Function test case / Attach test cases / attempt to attach with doze on
+
+ Validates that if trying to attach with device in doze mode will receive the
+ expected failure callback. As a side-effect also validates that the
+ broadcast for Aware unavailable is received.
+ """
+ dut = self.android_devices[0]
+ asserts.assert_true(utils.enable_doze(dut), "Can't enable doze")
+ autils.wait_for_event(dut, aconsts.BROADCAST_WIFI_AWARE_NOT_AVAILABLE)
+ dut.droid.wifiAwareAttach()
+ autils.wait_for_event(dut, aconsts.EVENT_CB_ON_ATTACH_FAILED)
+ asserts.assert_true(utils.disable_doze(dut), "Can't disable doze")
+ autils.wait_for_event(dut, aconsts.BROADCAST_WIFI_AWARE_AVAILABLE)
+
+ def test_attach_with_location_off(self):
+ """Function test case / Attach test cases / attempt to attach with location
+ mode off.
+
+ Validates that if trying to attach with device location mode off will
+ receive the expected failure callback. As a side-effect also validates that
+ the broadcast for Aware unavailable is received.
+ """
+ dut = self.android_devices[0]
+ utils.set_location_service(dut, False)
+ autils.wait_for_event(dut, aconsts.BROADCAST_WIFI_AWARE_NOT_AVAILABLE)
+ dut.droid.wifiAwareAttach()
+ autils.wait_for_event(dut, aconsts.EVENT_CB_ON_ATTACH_FAILED)
+ utils.set_location_service(dut, True)
+ autils.wait_for_event(dut, aconsts.BROADCAST_WIFI_AWARE_AVAILABLE)
+
@test_tracker_info(uuid="7ffde8e7-a010-4b77-97f5-959f263b5249")
def test_attach_apm_toggle_attach_again(self):
"""Validates that enabling Airplane mode while Aware is on resets it
@@ -120,12 +152,12 @@
autils.wait_for_event(dut, aconsts.EVENT_CB_ON_ATTACHED)
# enable airplane mode
- force_airplane_mode(dut, True)
+ utils.force_airplane_mode(dut, True)
autils.wait_for_event(dut, aconsts.BROADCAST_WIFI_AWARE_NOT_AVAILABLE)
# wait a few seconds and disable airplane mode
time.sleep(10)
- force_airplane_mode(dut, False)
+ utils.force_airplane_mode(dut, False)
autils.wait_for_event(dut, aconsts.BROADCAST_WIFI_AWARE_AVAILABLE)
# try enabling Aware again (attach)
diff --git a/acts/tests/google/wifi/aware/functional/DataPathTest.py b/acts/tests/google/wifi/aware/functional/DataPathTest.py
index 66ec715..120fc88 100644
--- a/acts/tests/google/wifi/aware/functional/DataPathTest.py
+++ b/acts/tests/google/wifi/aware/functional/DataPathTest.py
@@ -17,6 +17,7 @@
import time
from acts import asserts
+from acts import utils
from acts.test_decorators import test_tracker_info
from acts.test_utils.net import connectivity_const as cconsts
from acts.test_utils.wifi.aware import aware_const as aconsts
@@ -148,7 +149,8 @@
use_peer_id,
passphrase_to_use=None,
pub_on_both=False,
- pub_on_both_same=True):
+ pub_on_both_same=True,
+ expect_failure=False):
"""Runs the in-band data-path tests.
Args:
@@ -163,6 +165,8 @@
publisher isn't used (existing to test use-case).
pub_on_both_same: If True then the second publish uses an identical
service name, otherwise a different service name.
+ expect_failure: If True then don't expect NDP formation, otherwise expect
+ NDP setup to succeed.
"""
(p_dut, s_dut, p_id, s_id, p_disc_id, s_disc_id, peer_id_on_sub,
peer_id_on_pub) = self.set_up_discovery(ptype, stype, use_peer_id,
@@ -189,51 +193,59 @@
s_dut.droid.wifiAwareCreateNetworkSpecifier(s_disc_id, peer_id_on_sub,
passphrase, pmk))
- # Publisher & Subscriber: wait for network formation
- p_net_event = autils.wait_for_event_with_keys(
- p_dut, cconsts.EVENT_NETWORK_CALLBACK,
- autils.EVENT_NDP_TIMEOUT,
- (cconsts.NETWORK_CB_KEY_EVENT,
- cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED),
- (cconsts.NETWORK_CB_KEY_ID, p_req_key))
- s_net_event = autils.wait_for_event_with_keys(
- s_dut, cconsts.EVENT_NETWORK_CALLBACK,
- autils.EVENT_NDP_TIMEOUT,
- (cconsts.NETWORK_CB_KEY_EVENT,
- cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED),
- (cconsts.NETWORK_CB_KEY_ID, s_req_key))
+ if expect_failure:
+ # Publisher & Subscriber: fail on network formation
+ time.sleep(autils.EVENT_NDP_TIMEOUT)
+ autils.fail_on_event_with_keys(p_dut, cconsts.EVENT_NETWORK_CALLBACK, 0,
+ (cconsts.NETWORK_CB_KEY_ID, p_req_key))
+ autils.fail_on_event_with_keys(s_dut, cconsts.EVENT_NETWORK_CALLBACK, 0,
+ (cconsts.NETWORK_CB_KEY_ID, s_req_key))
+ else:
+ # Publisher & Subscriber: wait for network formation
+ p_net_event = autils.wait_for_event_with_keys(
+ p_dut, cconsts.EVENT_NETWORK_CALLBACK,
+ autils.EVENT_NDP_TIMEOUT,
+ (cconsts.NETWORK_CB_KEY_EVENT,
+ cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED),
+ (cconsts.NETWORK_CB_KEY_ID, p_req_key))
+ s_net_event = autils.wait_for_event_with_keys(
+ s_dut, cconsts.EVENT_NETWORK_CALLBACK,
+ autils.EVENT_NDP_TIMEOUT,
+ (cconsts.NETWORK_CB_KEY_EVENT,
+ cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED),
+ (cconsts.NETWORK_CB_KEY_ID, s_req_key))
- p_aware_if = p_net_event["data"][cconsts.NETWORK_CB_KEY_INTERFACE_NAME]
- s_aware_if = s_net_event["data"][cconsts.NETWORK_CB_KEY_INTERFACE_NAME]
- self.log.info("Interface names: p=%s, s=%s", p_aware_if, s_aware_if)
+ p_aware_if = p_net_event["data"][cconsts.NETWORK_CB_KEY_INTERFACE_NAME]
+ s_aware_if = s_net_event["data"][cconsts.NETWORK_CB_KEY_INTERFACE_NAME]
+ self.log.info("Interface names: p=%s, s=%s", p_aware_if, s_aware_if)
- p_ipv6 = p_dut.droid.connectivityGetLinkLocalIpv6Address(p_aware_if).split(
- "%")[0]
- s_ipv6 = s_dut.droid.connectivityGetLinkLocalIpv6Address(s_aware_if).split(
- "%")[0]
- self.log.info("Interface addresses (IPv6): p=%s, s=%s", p_ipv6, s_ipv6)
+ p_ipv6 = \
+ p_dut.droid.connectivityGetLinkLocalIpv6Address(p_aware_if).split("%")[0]
+ s_ipv6 = \
+ s_dut.droid.connectivityGetLinkLocalIpv6Address(s_aware_if).split("%")[0]
+ self.log.info("Interface addresses (IPv6): p=%s, s=%s", p_ipv6, s_ipv6)
- # TODO: possibly send messages back and forth, prefer to use netcat/nc
+ # TODO: possibly send messages back and forth, prefer to use netcat/nc
- # terminate sessions and wait for ON_LOST callbacks
- p_dut.droid.wifiAwareDestroy(p_id)
- s_dut.droid.wifiAwareDestroy(s_id)
+ # terminate sessions and wait for ON_LOST callbacks
+ p_dut.droid.wifiAwareDestroy(p_id)
+ s_dut.droid.wifiAwareDestroy(s_id)
- autils.wait_for_event_with_keys(
- p_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT,
- (cconsts.NETWORK_CB_KEY_EVENT,
- cconsts.NETWORK_CB_LOST), (cconsts.NETWORK_CB_KEY_ID, p_req_key))
- autils.wait_for_event_with_keys(
- s_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT,
- (cconsts.NETWORK_CB_KEY_EVENT,
- cconsts.NETWORK_CB_LOST), (cconsts.NETWORK_CB_KEY_ID, s_req_key))
+ autils.wait_for_event_with_keys(
+ p_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT,
+ (cconsts.NETWORK_CB_KEY_EVENT,
+ cconsts.NETWORK_CB_LOST), (cconsts.NETWORK_CB_KEY_ID, p_req_key))
+ autils.wait_for_event_with_keys(
+ s_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT,
+ (cconsts.NETWORK_CB_KEY_EVENT,
+ cconsts.NETWORK_CB_LOST), (cconsts.NETWORK_CB_KEY_ID, s_req_key))
# clean-up
p_dut.droid.connectivityUnregisterNetworkCallback(p_req_key)
s_dut.droid.connectivityUnregisterNetworkCallback(s_req_key)
def run_oob_data_path_test(self, encr_type, use_peer_id,
- setup_discovery_sessions=False):
+ setup_discovery_sessions=False, expect_failure=False):
"""Runs the out-of-band data-path tests.
Args:
@@ -243,6 +255,8 @@
setup_discovery_sessions: If True also set up a (spurious) discovery
session (pub on both sides, sub on Responder side). Validates a corner
case.
+ expect_failure: If True then don't expect NDP formation, otherwise expect
+ NDP setup to succeed.
"""
init_dut = self.android_devices[0]
init_dut.pretty_name = "Initiator"
@@ -299,47 +313,57 @@
init_dut.droid.wifiAwareCreateNetworkSpecifierOob(
init_id, aconsts.DATA_PATH_INITIATOR, resp_mac, passphrase, pmk))
- # Initiator & Responder: wait for network formation
- init_net_event = autils.wait_for_event_with_keys(
- init_dut, cconsts.EVENT_NETWORK_CALLBACK,
- autils.EVENT_NDP_TIMEOUT,
- (cconsts.NETWORK_CB_KEY_EVENT,
- cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED),
- (cconsts.NETWORK_CB_KEY_ID, init_req_key))
- resp_net_event = autils.wait_for_event_with_keys(
- resp_dut, cconsts.EVENT_NETWORK_CALLBACK,
- autils.EVENT_NDP_TIMEOUT,
- (cconsts.NETWORK_CB_KEY_EVENT,
- cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED),
- (cconsts.NETWORK_CB_KEY_ID, resp_req_key))
+ if expect_failure:
+ # Initiator & Responder: fail on network formation
+ time.sleep(autils.EVENT_NDP_TIMEOUT)
+ autils.fail_on_event_with_keys(resp_dut, cconsts.EVENT_NETWORK_CALLBACK,
+ 0,
+ (cconsts.NETWORK_CB_KEY_ID, resp_req_key))
+ autils.fail_on_event_with_keys(init_dut, cconsts.EVENT_NETWORK_CALLBACK,
+ 0,
+ (cconsts.NETWORK_CB_KEY_ID, init_req_key))
+ else:
+ # Initiator & Responder: wait for network formation
+ init_net_event = autils.wait_for_event_with_keys(
+ init_dut, cconsts.EVENT_NETWORK_CALLBACK,
+ autils.EVENT_NDP_TIMEOUT,
+ (cconsts.NETWORK_CB_KEY_EVENT,
+ cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED),
+ (cconsts.NETWORK_CB_KEY_ID, init_req_key))
+ resp_net_event = autils.wait_for_event_with_keys(
+ resp_dut, cconsts.EVENT_NETWORK_CALLBACK,
+ autils.EVENT_NDP_TIMEOUT,
+ (cconsts.NETWORK_CB_KEY_EVENT,
+ cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED),
+ (cconsts.NETWORK_CB_KEY_ID, resp_req_key))
- init_aware_if = init_net_event["data"][
- cconsts.NETWORK_CB_KEY_INTERFACE_NAME]
- resp_aware_if = resp_net_event["data"][
- cconsts.NETWORK_CB_KEY_INTERFACE_NAME]
- self.log.info("Interface names: I=%s, R=%s", init_aware_if, resp_aware_if)
+ init_aware_if = init_net_event["data"][
+ cconsts.NETWORK_CB_KEY_INTERFACE_NAME]
+ resp_aware_if = resp_net_event["data"][
+ cconsts.NETWORK_CB_KEY_INTERFACE_NAME]
+ self.log.info("Interface names: I=%s, R=%s", init_aware_if, resp_aware_if)
- init_ipv6 = init_dut.droid.connectivityGetLinkLocalIpv6Address(
- init_aware_if).split("%")[0]
- resp_ipv6 = resp_dut.droid.connectivityGetLinkLocalIpv6Address(
- resp_aware_if).split("%")[0]
- self.log.info("Interface addresses (IPv6): I=%s, R=%s", init_ipv6,
- resp_ipv6)
+ init_ipv6 = init_dut.droid.connectivityGetLinkLocalIpv6Address(
+ init_aware_if).split("%")[0]
+ resp_ipv6 = resp_dut.droid.connectivityGetLinkLocalIpv6Address(
+ resp_aware_if).split("%")[0]
+ self.log.info("Interface addresses (IPv6): I=%s, R=%s", init_ipv6,
+ resp_ipv6)
- # TODO: possibly send messages back and forth, prefer to use netcat/nc
+ # TODO: possibly send messages back and forth, prefer to use netcat/nc
- # terminate sessions and wait for ON_LOST callbacks
- init_dut.droid.wifiAwareDestroy(init_id)
- resp_dut.droid.wifiAwareDestroy(resp_id)
+ # terminate sessions and wait for ON_LOST callbacks
+ init_dut.droid.wifiAwareDestroy(init_id)
+ resp_dut.droid.wifiAwareDestroy(resp_id)
- autils.wait_for_event_with_keys(
- init_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT,
- (cconsts.NETWORK_CB_KEY_EVENT,
- cconsts.NETWORK_CB_LOST), (cconsts.NETWORK_CB_KEY_ID, init_req_key))
- autils.wait_for_event_with_keys(
- resp_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT,
- (cconsts.NETWORK_CB_KEY_EVENT,
- cconsts.NETWORK_CB_LOST), (cconsts.NETWORK_CB_KEY_ID, resp_req_key))
+ autils.wait_for_event_with_keys(
+ init_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT,
+ (cconsts.NETWORK_CB_KEY_EVENT,
+ cconsts.NETWORK_CB_LOST), (cconsts.NETWORK_CB_KEY_ID, init_req_key))
+ autils.wait_for_event_with_keys(
+ resp_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT,
+ (cconsts.NETWORK_CB_KEY_EVENT,
+ cconsts.NETWORK_CB_LOST), (cconsts.NETWORK_CB_KEY_ID, resp_req_key))
# clean-up
resp_dut.droid.connectivityUnregisterNetworkCallback(resp_req_key)
@@ -638,7 +662,7 @@
# or different from the primary session.
# pub_type: Type of publish discovery session: unsolicited or solicited.
# sub_type: Type of subscribe discovery session: passive or active.
- # encr_type: Encription type: open, passphrase
+ # encr_type: Encryption type: open, passphrase
# peer_spec: Peer specification method: any or specific
#
# Note: In-Band means using Wi-Fi Aware for discovery and referring to the
@@ -715,7 +739,7 @@
# names is: test_oob_<encr_type>_<peer_spec>
# where:
#
- # encr_type: Encription type: open, passphrase
+ # encr_type: Encryption type: open, passphrase
# peer_spec: Peer specification method: any or specific
#
# Optionally set up an extra discovery session to test coexistence. If so
@@ -1302,41 +1326,41 @@
@test_tracker_info(uuid="2d728163-11cc-46ba-a973-c8e1e71397fc")
def test_multiple_ndi_open_passphrase(self):
- """Verify that can between 2 DUTs can create 2 NDPs with different security
+ """Verify that between 2 DUTs can create 2 NDPs with different security
configuration (one open, one using passphrase). The result should use two
different NDIs"""
self.run_multiple_ndi([None, self.PASSPHRASE])
@test_tracker_info(uuid="5f2c32aa-20b2-41f0-8b1e-d0b68df73ada")
def test_multiple_ndi_open_pmk(self):
- """Verify that can between 2 DUTs can create 2 NDPs with different security
+ """Verify that between 2 DUTs can create 2 NDPs with different security
configuration (one open, one using pmk). The result should use two
different NDIs"""
self.run_multiple_ndi([None, self.PMK])
@test_tracker_info(uuid="34467659-bcfb-40cd-ba25-7e50560fca63")
def test_multiple_ndi_passphrase_pmk(self):
- """Verify that can between 2 DUTs can create 2 NDPs with different security
+ """Verify that between 2 DUTs can create 2 NDPs with different security
configuration (one using passphrase, one using pmk). The result should use
two different NDIs"""
self.run_multiple_ndi([self.PASSPHRASE, self.PMK])
@test_tracker_info(uuid="d9194ce6-45b6-41b1-9cc8-ada79968966d")
def test_multiple_ndi_passphrases(self):
- """Verify that can between 2 DUTs can create 2 NDPs with different security
+ """Verify that between 2 DUTs can create 2 NDPs with different security
configuration (using different passphrases). The result should use two
different NDIs"""
self.run_multiple_ndi([self.PASSPHRASE, self.PASSPHRASE2])
@test_tracker_info(uuid="879df795-62d2-40d4-a862-bd46d8f7e67f")
def test_multiple_ndi_pmks(self):
- """Verify that can between 2 DUTs can create 2 NDPs with different security
+ """Verify that between 2 DUTs can create 2 NDPs with different security
configuration (using different PMKS). The result should use two different
NDIs"""
self.run_multiple_ndi([self.PMK, self.PMK2])
def test_multiple_ndi_open_passphrase_flip(self):
- """Verify that can between 2 DUTs can create 2 NDPs with different security
+ """Verify that between 2 DUTs can create 2 NDPs with different security
configuration (one open, one using passphrase). The result should use two
different NDIs.
@@ -1345,7 +1369,7 @@
self.run_multiple_ndi([None, self.PASSPHRASE], flip_init_resp=True)
def test_multiple_ndi_open_pmk_flip(self):
- """Verify that can between 2 DUTs can create 2 NDPs with different security
+ """Verify that between 2 DUTs can create 2 NDPs with different security
configuration (one open, one using pmk). The result should use two
different NDIs
@@ -1354,7 +1378,7 @@
self.run_multiple_ndi([None, self.PMK], flip_init_resp=True)
def test_multiple_ndi_passphrase_pmk_flip(self):
- """Verify that can between 2 DUTs can create 2 NDPs with different security
+ """Verify that between 2 DUTs can create 2 NDPs with different security
configuration (one using passphrase, one using pmk). The result should use
two different NDIs
@@ -1363,7 +1387,7 @@
self.run_multiple_ndi([self.PASSPHRASE, self.PMK], flip_init_resp=True)
def test_multiple_ndi_passphrases_flip(self):
- """Verify that can between 2 DUTs can create 2 NDPs with different security
+ """Verify that between 2 DUTs can create 2 NDPs with different security
configuration (using different passphrases). The result should use two
different NDIs
@@ -1373,10 +1397,417 @@
flip_init_resp=True)
def test_multiple_ndi_pmks_flip(self):
- """Verify that can between 2 DUTs can create 2 NDPs with different security
+ """Verify that between 2 DUTs can create 2 NDPs with different security
configuration (using different PMKS). The result should use two different
NDIs
Flip Initiator and Responder roles.
"""
self.run_multiple_ndi([self.PMK, self.PMK2], flip_init_resp=True)
+
+ #######################################
+
+ def test_ib_responder_any_usage(self):
+ """Verify that configuring an in-band (Aware discovery) Responder to receive
+ an NDP request from any peer is not permitted by current API level. Override
+ API check to validate that possible (i.e. that failure at current API level
+ is due to an API check and not some underlying failure).
+ """
+
+ # configure all devices to override API check and allow a Responder from ANY
+ for ad in self.android_devices:
+ autils.configure_ndp_allow_any_override(ad, True)
+ self.run_ib_data_path_test(
+ ptype=aconsts.PUBLISH_TYPE_UNSOLICITED,
+ stype=aconsts.SUBSCRIBE_TYPE_PASSIVE,
+ encr_type=self.ENCR_TYPE_OPEN,
+ use_peer_id=False)
+
+ # configure all devices to respect API check - i.e. disallow a Responder
+ # from ANY
+ for ad in self.android_devices:
+ autils.configure_ndp_allow_any_override(ad, False)
+ self.run_ib_data_path_test(
+ ptype=aconsts.PUBLISH_TYPE_UNSOLICITED,
+ stype=aconsts.SUBSCRIBE_TYPE_PASSIVE,
+ encr_type=self.ENCR_TYPE_OPEN,
+ use_peer_id=False,
+ expect_failure=True)
+
+ def test_oob_responder_any_usage(self):
+ """Verify that configuring an out-of-band (Aware discovery) Responder to
+ receive an NDP request from any peer is not permitted by current API level.
+ Override API check to validate that possible (i.e. that failure at current
+ API level is due to an API check and not some underlying failure).
+ """
+
+ # configure all devices to override API check and allow a Responder from ANY
+ for ad in self.android_devices:
+ autils.configure_ndp_allow_any_override(ad, True)
+ self.run_oob_data_path_test(
+ encr_type=self.ENCR_TYPE_OPEN,
+ use_peer_id=False)
+
+ # configure all devices to respect API check - i.e. disallow a Responder
+ # from ANY
+ for ad in self.android_devices:
+ autils.configure_ndp_allow_any_override(ad, False)
+ self.run_oob_data_path_test(
+ encr_type=self.ENCR_TYPE_OPEN,
+ use_peer_id=False,
+ expect_failure=True)
+
+ #######################################
+
+ def run_multiple_regulatory_domains(self, use_ib, init_domain, resp_domain):
+ """Verify that a data-path setup with two conflicting regulatory domains
+ works (the result should be run in Channel 6 - but that is not tested).
+
+ Args:
+ use_ib: True to use in-band discovery, False to use out-of-band discovery.
+ init_domain: The regulatory domain of the Initiator/Subscriber.
+ resp_domain: The regulator domain of the Responder/Publisher.
+ """
+ init_dut = self.android_devices[0]
+ resp_dut = self.android_devices[1]
+
+ utils.set_regulatory_domain(init_dut, init_domain)
+ utils.set_regulatory_domain(resp_dut, resp_domain)
+
+ if use_ib:
+ (resp_req_key, init_req_key, resp_aware_if, init_aware_if, resp_ipv6,
+ init_ipv6) = autils.create_ib_ndp(resp_dut, init_dut,
+ autils.create_discovery_config(
+ "GoogleTestXyz",
+ aconsts.PUBLISH_TYPE_UNSOLICITED),
+ autils.create_discovery_config(
+ "GoogleTestXyz",
+ aconsts.SUBSCRIBE_TYPE_PASSIVE),
+ self.device_startup_offset)
+ else:
+ (init_req_key, resp_req_key, init_aware_if, resp_aware_if, init_ipv6,
+ resp_ipv6) = autils.create_oob_ndp(init_dut, resp_dut)
+
+ self.log.info("Interface names: I=%s, R=%s", init_aware_if, resp_aware_if)
+ self.log.info("Interface addresses (IPv6): I=%s, R=%s", init_ipv6,
+ resp_ipv6)
+
+ # clean-up
+ resp_dut.droid.connectivityUnregisterNetworkCallback(resp_req_key)
+ init_dut.droid.connectivityUnregisterNetworkCallback(init_req_key)
+
+ def test_multiple_regulator_domains_ib_us_jp(self):
+ """Verify data-path setup across multiple regulator domains.
+
+ - Uses in-band discovery
+ - Subscriber=US, Publisher=JP
+ """
+ self.run_multiple_regulatory_domains(use_ib=True,
+ init_domain="US",
+ resp_domain="JP")
+
+ def test_multiple_regulator_domains_ib_jp_us(self):
+ """Verify data-path setup across multiple regulator domains.
+
+ - Uses in-band discovery
+ - Subscriber=JP, Publisher=US
+ """
+ self.run_multiple_regulatory_domains(use_ib=True,
+ init_domain="JP",
+ resp_domain="US")
+
+ def test_multiple_regulator_domains_oob_us_jp(self):
+ """Verify data-path setup across multiple regulator domains.
+
+ - Uses out-f-band discovery
+ - Initiator=US, Responder=JP
+ """
+ self.run_multiple_regulatory_domains(use_ib=False,
+ init_domain="US",
+ resp_domain="JP")
+
+ def test_multiple_regulator_domains_oob_jp_us(self):
+ """Verify data-path setup across multiple regulator domains.
+
+ - Uses out-of-band discovery
+ - Initiator=JP, Responder=US
+ """
+ self.run_multiple_regulatory_domains(use_ib=False,
+ init_domain="JP",
+ resp_domain="US")
+
+ ########################################################################
+
+ def run_mix_ib_oob(self, same_request, ib_first, inits_on_same_dut):
+ """Validate that multiple network requests issued using both in-band and
+ out-of-band discovery behave as expected.
+
+ The same_request parameter controls whether identical single NDP is
+ expected, if True, or whether multiple NDPs on different NDIs are expected,
+ if False.
+
+ Args:
+ same_request: Issue canonically identical requests (same NMI peer, same
+ passphrase) if True, if False use different passphrases.
+ ib_first: If True then the in-band network is requested first, otherwise
+ (if False) then the out-of-band network is requested first.
+ inits_on_same_dut: If True then the Initiators are run on the same device,
+ otherwise (if False) then the Initiators are run on
+ different devices. Note that Subscribe == Initiator.
+ """
+ if not same_request:
+ asserts.skip_if(self.android_devices[0].aware_capabilities[
+ aconsts.CAP_MAX_NDI_INTERFACES] < 2 or
+ self.android_devices[1].aware_capabilities[
+ aconsts.CAP_MAX_NDI_INTERFACES] < 2,
+ "DUTs do not support enough NDIs")
+
+ (p_dut, s_dut, p_id, s_id, p_disc_id, s_disc_id, peer_id_on_sub,
+ peer_id_on_pub_null) = self.set_up_discovery(
+ aconsts.PUBLISH_TYPE_UNSOLICITED, aconsts.SUBSCRIBE_TYPE_PASSIVE, False)
+
+ p_id2, p_mac = autils.attach_with_identity(p_dut)
+ s_id2, s_mac = autils.attach_with_identity(s_dut)
+
+ if inits_on_same_dut:
+ resp_dut = p_dut
+ resp_id = p_id2
+ resp_mac = p_mac
+
+ init_dut = s_dut
+ init_id = s_id2
+ init_mac = s_mac
+ else:
+ resp_dut = s_dut
+ resp_id = s_id2
+ resp_mac = s_mac
+
+ init_dut = p_dut
+ init_id = p_id2
+ init_mac = p_mac
+
+ passphrase = None if same_request else self.PASSPHRASE
+
+ if ib_first:
+ # request in-band network (to completion)
+ p_req_key = self.request_network(
+ p_dut,
+ p_dut.droid.wifiAwareCreateNetworkSpecifier(p_disc_id, None))
+ s_req_key = self.request_network(
+ s_dut,
+ s_dut.droid.wifiAwareCreateNetworkSpecifier(s_disc_id,
+ peer_id_on_sub))
+
+ # Publisher & Subscriber: wait for network formation
+ p_net_event = autils.wait_for_event_with_keys(
+ p_dut, cconsts.EVENT_NETWORK_CALLBACK,
+ autils.EVENT_NDP_TIMEOUT,
+ (cconsts.NETWORK_CB_KEY_EVENT,
+ cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED),
+ (cconsts.NETWORK_CB_KEY_ID, p_req_key))
+ s_net_event = autils.wait_for_event_with_keys(
+ s_dut, cconsts.EVENT_NETWORK_CALLBACK,
+ autils.EVENT_NDP_TIMEOUT,
+ (cconsts.NETWORK_CB_KEY_EVENT,
+ cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED),
+ (cconsts.NETWORK_CB_KEY_ID, s_req_key))
+
+ # request out-of-band network
+ resp_req_key = autils.request_network(resp_dut,
+ resp_dut.droid.wifiAwareCreateNetworkSpecifierOob(
+ resp_id, aconsts.DATA_PATH_RESPONDER, init_mac, passphrase))
+ init_req_key = autils.request_network(init_dut,
+ init_dut.droid.wifiAwareCreateNetworkSpecifierOob(
+ init_id, aconsts.DATA_PATH_INITIATOR, resp_mac, passphrase))
+
+ resp_net_event = autils.wait_for_event_with_keys(
+ resp_dut, cconsts.EVENT_NETWORK_CALLBACK,
+ autils.EVENT_NDP_TIMEOUT,
+ (cconsts.NETWORK_CB_KEY_EVENT,
+ cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED),
+ (cconsts.NETWORK_CB_KEY_ID, resp_req_key))
+ init_net_event = autils.wait_for_event_with_keys(
+ init_dut, cconsts.EVENT_NETWORK_CALLBACK,
+ autils.EVENT_NDP_TIMEOUT,
+ (cconsts.NETWORK_CB_KEY_EVENT,
+ cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED),
+ (cconsts.NETWORK_CB_KEY_ID, init_req_key))
+
+ if not ib_first:
+ # request in-band network (to completion)
+ p_req_key = self.request_network(
+ p_dut,
+ p_dut.droid.wifiAwareCreateNetworkSpecifier(p_disc_id, None))
+ s_req_key = self.request_network(
+ s_dut,
+ s_dut.droid.wifiAwareCreateNetworkSpecifier(s_disc_id,
+ peer_id_on_sub))
+
+ # Publisher & Subscriber: wait for network formation
+ p_net_event = autils.wait_for_event_with_keys(
+ p_dut, cconsts.EVENT_NETWORK_CALLBACK,
+ autils.EVENT_NDP_TIMEOUT,
+ (cconsts.NETWORK_CB_KEY_EVENT,
+ cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED),
+ (cconsts.NETWORK_CB_KEY_ID, p_req_key))
+ s_net_event = autils.wait_for_event_with_keys(
+ s_dut, cconsts.EVENT_NETWORK_CALLBACK,
+ autils.EVENT_NDP_TIMEOUT,
+ (cconsts.NETWORK_CB_KEY_EVENT,
+ cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED),
+ (cconsts.NETWORK_CB_KEY_ID, s_req_key))
+
+ # extract net info
+ pub_interface = p_net_event["data"][cconsts.NETWORK_CB_KEY_INTERFACE_NAME]
+ sub_interface = s_net_event["data"][cconsts.NETWORK_CB_KEY_INTERFACE_NAME]
+ resp_interface = resp_net_event["data"][
+ cconsts.NETWORK_CB_KEY_INTERFACE_NAME]
+ init_interface = init_net_event["data"][
+ cconsts.NETWORK_CB_KEY_INTERFACE_NAME]
+
+ self.log.info(
+ "Interface names: Pub=%s, Sub=%s, Resp=%s, Init=%s", pub_interface,
+ sub_interface, resp_interface, init_interface)
+
+ pub_ipv6 = \
+ p_dut.droid.connectivityGetLinkLocalIpv6Address(pub_interface).split("%")[0]
+ sub_ipv6 = \
+ s_dut.droid.connectivityGetLinkLocalIpv6Address(sub_interface).split("%")[0]
+ resp_ipv6 = \
+ resp_dut.droid.connectivityGetLinkLocalIpv6Address(resp_interface).split(
+ "%")[0]
+ init_ipv6 = \
+ init_dut.droid.connectivityGetLinkLocalIpv6Address(init_interface).split(
+ "%")[0]
+
+ self.log.info(
+ "Interface addresses (IPv6): Pub=%s, Sub=%s, Resp=%s, Init=%s", pub_ipv6,
+ sub_ipv6, resp_ipv6, init_ipv6)
+
+ # validate NDP/NDI conditions (using interface names & ipv6)
+ if same_request:
+ asserts.assert_equal(pub_interface,
+ resp_interface if inits_on_same_dut else init_interface,
+ "NDP interfaces don't match on Pub/other")
+ asserts.assert_equal(sub_interface,
+ init_interface if inits_on_same_dut else resp_interface,
+ "NDP interfaces don't match on Sub/other")
+
+ asserts.assert_equal(pub_ipv6,
+ resp_ipv6 if inits_on_same_dut else init_ipv6,
+ "NDP IPv6 don't match on Pub/other")
+ asserts.assert_equal(sub_ipv6,
+ init_ipv6 if inits_on_same_dut else resp_ipv6,
+ "NDP IPv6 don't match on Sub/other")
+ else:
+ asserts.assert_false(pub_interface == (
+ resp_interface if inits_on_same_dut else init_interface),
+ "NDP interfaces match on Pub/other")
+ asserts.assert_false(sub_interface == (
+ init_interface if inits_on_same_dut else resp_interface),
+ "NDP interfaces match on Sub/other")
+
+ asserts.assert_false(pub_ipv6 ==
+ (resp_ipv6 if inits_on_same_dut else init_ipv6),
+ "NDP IPv6 match on Pub/other")
+ asserts.assert_false(sub_ipv6 ==
+ (init_ipv6 if inits_on_same_dut else resp_ipv6),
+ "NDP IPv6 match on Sub/other")
+
+ # release requests
+ p_dut.droid.connectivityUnregisterNetworkCallback(p_req_key)
+ s_dut.droid.connectivityUnregisterNetworkCallback(s_req_key)
+ resp_dut.droid.connectivityUnregisterNetworkCallback(resp_req_key)
+ init_dut.droid.connectivityUnregisterNetworkCallback(init_req_key)
+
+ def test_identical_ndps_mix_ib_oob_ib_first_same_polarity(self):
+ """Validate that a single NDP is created for multiple identical requests
+ which are issued through either in-band (ib) or out-of-band (oob) APIs.
+
+ The in-band request is issued first. Both Initiators (Sub == Initiator) are
+ run on the same device.
+ """
+ self.run_mix_ib_oob(same_request=True,
+ ib_first=True,
+ inits_on_same_dut=True)
+
+ def test_identical_ndps_mix_ib_oob_oob_first_same_polarity(self):
+ """Validate that a single NDP is created for multiple identical requests
+ which are issued through either in-band (ib) or out-of-band (oob) APIs.
+
+ The out-of-band request is issued first. Both Initiators (Sub == Initiator)
+ are run on the same device.
+ """
+ self.run_mix_ib_oob(same_request=True,
+ ib_first=False,
+ inits_on_same_dut=True)
+
+ def test_identical_ndps_mix_ib_oob_ib_first_diff_polarity(self):
+ """Validate that a single NDP is created for multiple identical requests
+ which are issued through either in-band (ib) or out-of-band (oob) APIs.
+
+ The in-band request is issued first. Initiators (Sub == Initiator) are
+ run on different devices.
+ """
+ self.run_mix_ib_oob(same_request=True,
+ ib_first=True,
+ inits_on_same_dut=False)
+
+ def test_identical_ndps_mix_ib_oob_oob_first_diff_polarity(self):
+ """Validate that a single NDP is created for multiple identical requests
+ which are issued through either in-band (ib) or out-of-band (oob) APIs.
+
+ The out-of-band request is issued first. Initiators (Sub == Initiator) are
+ run on different devices.
+ """
+ self.run_mix_ib_oob(same_request=True,
+ ib_first=False,
+ inits_on_same_dut=False)
+
+ def test_multiple_ndis_mix_ib_oob_ib_first_same_polarity(self):
+ """Validate that multiple NDIs are created for NDPs which are requested with
+ different security configurations. Use a mix of in-band and out-of-band APIs
+ to request the different NDPs.
+
+ The in-band request is issued first. Initiators (Sub == Initiator) are
+ run on the same device.
+ """
+ self.run_mix_ib_oob(same_request=False,
+ ib_first=True,
+ inits_on_same_dut=True)
+
+ def test_multiple_ndis_mix_ib_oob_oob_first_same_polarity(self):
+ """Validate that multiple NDIs are created for NDPs which are requested with
+ different security configurations. Use a mix of in-band and out-of-band APIs
+ to request the different NDPs.
+
+ The out-of-band request is issued first. Initiators (Sub == Initiator) are
+ run on the same device.
+ """
+ self.run_mix_ib_oob(same_request=False,
+ ib_first=False,
+ inits_on_same_dut=True)
+
+ def test_multiple_ndis_mix_ib_oob_ib_first_diff_polarity(self):
+ """Validate that multiple NDIs are created for NDPs which are requested with
+ different security configurations. Use a mix of in-band and out-of-band APIs
+ to request the different NDPs.
+
+ The in-band request is issued first. Initiators (Sub == Initiator) are
+ run on different devices.
+ """
+ self.run_mix_ib_oob(same_request=False,
+ ib_first=True,
+ inits_on_same_dut=False)
+
+ def test_multiple_ndis_mix_ib_oob_oob_first_diff_polarity(self):
+ """Validate that multiple NDIs are created for NDPs which are requested with
+ different security configurations. Use a mix of in-band and out-of-band APIs
+ to request the different NDPs.
+
+ The out-of-band request is issued first. Initiators (Sub == Initiator) are
+ run on different devices.
+ """
+ self.run_mix_ib_oob(same_request=False,
+ ib_first=False,
+ inits_on_same_dut=False)
diff --git a/acts/tests/google/wifi/aware/functional/DiscoveryTest.py b/acts/tests/google/wifi/aware/functional/DiscoveryTest.py
index 1784d12..9182f39 100644
--- a/acts/tests/google/wifi/aware/functional/DiscoveryTest.py
+++ b/acts/tests/google/wifi/aware/functional/DiscoveryTest.py
@@ -831,7 +831,7 @@
s_mf_1="goodbye there string")
#######################################
- # Multiple concurrent services key
+ # Multiple concurrent services
#######################################
def run_multiple_concurrent_services(self, type_x, type_y):
@@ -1008,3 +1008,24 @@
self.run_multiple_concurrent_services(
type_x=[aconsts.PUBLISH_TYPE_UNSOLICITED, aconsts.SUBSCRIBE_TYPE_PASSIVE],
type_y=[aconsts.PUBLISH_TYPE_SOLICITED, aconsts.SUBSCRIBE_TYPE_ACTIVE])
+
+ #########################################################
+
+ def test_upper_lower_service_name_equivalence(self):
+ """Validate that Service Name is case-insensitive. Publish a service name
+ with mixed case, subscribe to the same service name with alternative case
+ and verify that discovery happens."""
+ p_dut = self.android_devices[0]
+ s_dut = self.android_devices[1]
+
+ pub_service_name = "GoogleAbCdEf"
+ sub_service_name = "GoogleaBcDeF"
+
+ autils.create_discovery_pair(p_dut, s_dut,
+ p_config=autils.create_discovery_config(
+ pub_service_name,
+ aconsts.PUBLISH_TYPE_UNSOLICITED),
+ s_config=autils.create_discovery_config(
+ sub_service_name,
+ aconsts.SUBSCRIBE_TYPE_PASSIVE),
+ device_startup_offset=self.device_startup_offset)
diff --git a/acts/tests/google/wifi/aware/performance/LatencyTest.py b/acts/tests/google/wifi/aware/performance/LatencyTest.py
index bde9ff4..9f2a5bf 100644
--- a/acts/tests/google/wifi/aware/performance/LatencyTest.py
+++ b/acts/tests/google/wifi/aware/performance/LatencyTest.py
@@ -92,8 +92,8 @@
s_dut.pretty_name = "Subscriber"
# override the default DW configuration
- autils.config_dw_all_modes(p_dut, dw_24ghz, dw_5ghz)
- autils.config_dw_all_modes(s_dut, dw_24ghz, dw_5ghz)
+ autils.config_power_settings(p_dut, dw_24ghz, dw_5ghz)
+ autils.config_power_settings(s_dut, dw_24ghz, dw_5ghz)
latencies = []
failed_discoveries = 0
@@ -174,8 +174,8 @@
s_dut.pretty_name = "Subscriber"
# override the default DW configuration
- autils.config_dw_all_modes(p_dut, dw_24ghz, dw_5ghz)
- autils.config_dw_all_modes(s_dut, dw_24ghz, dw_5ghz)
+ autils.config_power_settings(p_dut, dw_24ghz, dw_5ghz)
+ autils.config_power_settings(s_dut, dw_24ghz, dw_5ghz)
# Publisher+Subscriber: attach and wait for confirmation
p_id = p_dut.droid.wifiAwareAttach(False)
@@ -253,8 +253,8 @@
s_dut = self.android_devices[1]
# override the default DW configuration
- autils.config_dw_all_modes(p_dut, dw_24ghz, dw_5ghz)
- autils.config_dw_all_modes(s_dut, dw_24ghz, dw_5ghz)
+ autils.config_power_settings(p_dut, dw_24ghz, dw_5ghz)
+ autils.config_power_settings(s_dut, dw_24ghz, dw_5ghz)
# Start up a discovery session
(p_id, s_id, p_disc_id, s_disc_id,
@@ -341,8 +341,8 @@
resp_dut.pretty_name = 'Responder'
# override the default DW configuration
- autils.config_dw_all_modes(init_dut, dw_24ghz, dw_5ghz)
- autils.config_dw_all_modes(resp_dut, dw_24ghz, dw_5ghz)
+ autils.config_power_settings(init_dut, dw_24ghz, dw_5ghz)
+ autils.config_power_settings(resp_dut, dw_24ghz, dw_5ghz)
# Initiator+Responder: attach and wait for confirmation & identity
init_id = init_dut.droid.wifiAwareAttach(True)
@@ -438,8 +438,8 @@
self.run_synchronization_latency(
results=results,
do_unsolicited_passive=True,
- dw_24ghz=aconsts.DW_24_INTERACTIVE,
- dw_5ghz=aconsts.DW_5_INTERACTIVE,
+ dw_24ghz=aconsts.POWER_DW_24_INTERACTIVE,
+ dw_5ghz=aconsts.POWER_DW_5_INTERACTIVE,
num_iterations=10,
startup_offset=startup_offset,
timeout_period=20)
@@ -454,8 +454,8 @@
self.run_synchronization_latency(
results=results,
do_unsolicited_passive=True,
- dw_24ghz=aconsts.DW_24_NON_INTERACTIVE,
- dw_5ghz=aconsts.DW_5_NON_INTERACTIVE,
+ dw_24ghz=aconsts.POWER_DW_24_NON_INTERACTIVE,
+ dw_5ghz=aconsts.POWER_DW_5_NON_INTERACTIVE,
num_iterations=10,
startup_offset=startup_offset,
timeout_period=20)
@@ -469,8 +469,8 @@
self.run_discovery_latency(
results=results,
do_unsolicited_passive=True,
- dw_24ghz=aconsts.DW_24_INTERACTIVE,
- dw_5ghz=aconsts.DW_5_INTERACTIVE,
+ dw_24ghz=aconsts.POWER_DW_24_INTERACTIVE,
+ dw_5ghz=aconsts.POWER_DW_5_INTERACTIVE,
num_iterations=100)
asserts.explicit_pass(
"test_discovery_latency_default_parameters finished", extras=results)
@@ -482,8 +482,8 @@
self.run_discovery_latency(
results=results,
do_unsolicited_passive=True,
- dw_24ghz=aconsts.DW_24_NON_INTERACTIVE,
- dw_5ghz=aconsts.DW_5_NON_INTERACTIVE,
+ dw_24ghz=aconsts.POWER_DW_24_NON_INTERACTIVE,
+ dw_5ghz=aconsts.POWER_DW_5_NON_INTERACTIVE,
num_iterations=100)
asserts.explicit_pass(
"test_discovery_latency_non_interactive_dws finished", extras=results)
@@ -510,8 +510,8 @@
results = {}
self.run_message_latency(
results=results,
- dw_24ghz=aconsts.DW_24_INTERACTIVE,
- dw_5ghz=aconsts.DW_5_INTERACTIVE,
+ dw_24ghz=aconsts.POWER_DW_24_INTERACTIVE,
+ dw_5ghz=aconsts.POWER_DW_5_INTERACTIVE,
num_iterations=100)
asserts.explicit_pass(
"test_message_latency_default_dws finished", extras=results)
@@ -524,8 +524,8 @@
results = {}
self.run_message_latency(
results=results,
- dw_24ghz=aconsts.DW_24_NON_INTERACTIVE,
- dw_5ghz=aconsts.DW_5_NON_INTERACTIVE,
+ dw_24ghz=aconsts.POWER_DW_24_NON_INTERACTIVE,
+ dw_5ghz=aconsts.POWER_DW_5_NON_INTERACTIVE,
num_iterations=100)
asserts.explicit_pass(
"test_message_latency_non_interactive_dws finished", extras=results)
@@ -536,8 +536,8 @@
results = {}
self.run_ndp_oob_latency(
results=results,
- dw_24ghz=aconsts.DW_24_INTERACTIVE,
- dw_5ghz=aconsts.DW_5_INTERACTIVE,
+ dw_24ghz=aconsts.POWER_DW_24_INTERACTIVE,
+ dw_5ghz=aconsts.POWER_DW_5_INTERACTIVE,
num_iterations=100)
asserts.explicit_pass(
"test_ndp_setup_latency_default_dws finished", extras=results)
@@ -549,8 +549,8 @@
results = {}
self.run_ndp_oob_latency(
results=results,
- dw_24ghz=aconsts.DW_24_NON_INTERACTIVE,
- dw_5ghz=aconsts.DW_5_NON_INTERACTIVE,
+ dw_24ghz=aconsts.POWER_DW_24_NON_INTERACTIVE,
+ dw_5ghz=aconsts.POWER_DW_5_NON_INTERACTIVE,
num_iterations=100)
asserts.explicit_pass(
"test_ndp_setup_latency_non_interactive_dws finished", extras=results)
diff --git a/acts/tests/google/wifi/aware/performance/ThroughputTest.py b/acts/tests/google/wifi/aware/performance/ThroughputTest.py
index 6cf1046..7ee6e08 100644
--- a/acts/tests/google/wifi/aware/performance/ThroughputTest.py
+++ b/acts/tests/google/wifi/aware/performance/ThroughputTest.py
@@ -36,7 +36,7 @@
PASSPHRASE2 = "This is some random passphrase - very very secure - but diff!!"
def __init__(self, controllers):
- AwareBaseTest.__init__(self, controllers)
+ super(ThroughputTest, self).__init__(controllers)
def request_network(self, dut, ns):
"""Request a Wi-Fi Aware network.
diff --git a/acts/tests/google/wifi/rtt/config/wifi_rtt.json b/acts/tests/google/wifi/rtt/config/wifi_rtt.json
new file mode 100644
index 0000000..538563b
--- /dev/null
+++ b/acts/tests/google/wifi/rtt/config/wifi_rtt.json
@@ -0,0 +1,26 @@
+{
+ "_description": "This is a test configuration file for Wi-Fi RTT tests.",
+ "testbed":
+ [
+ {
+ "_description": "Wi-Fi RTT testbed: auto-detect all attached devices",
+ "name": "WifiRttAllAttached",
+ "AndroidDevice": "*"
+ }
+ ],
+ "logpath": "~/logs",
+ "testpaths": ["./tools/test/connectivity/acts/tests/google/wifi"],
+ "adb_logcat_param": "-b all",
+ "aware_default_power_mode": "INTERACTIVE",
+ "lci_reference": [],
+ "lcr_reference": [],
+ "rtt_reference_distance_mm": 100,
+ "rtt_reference_distance_margin_mm": 1000,
+ "rtt_max_failure_rate_two_sided_rtt_percentage": 10,
+ "rtt_max_failure_rate_one_sided_rtt_percentage": 50,
+ "rtt_max_margin_exceeded_rate_two_sided_rtt_percentage": 10,
+ "rtt_max_margin_exceeded_rate_one_sided_rtt_percentage": 50,
+ "rtt_min_expected_rssi_dbm": -100,
+ "stress_test_min_iteration_count": 100,
+ "stress_test_target_run_time_sec" : 30
+}
diff --git a/acts/tests/google/wifi/rtt/functional/AwareDiscoveryWithRangingTest.py b/acts/tests/google/wifi/rtt/functional/AwareDiscoveryWithRangingTest.py
new file mode 100644
index 0000000..47282e1
--- /dev/null
+++ b/acts/tests/google/wifi/rtt/functional/AwareDiscoveryWithRangingTest.py
@@ -0,0 +1,474 @@
+#!/usr/bin/python3.4
+#
+# Copyright 2017 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+
+from acts.test_utils.wifi.aware import aware_const as aconsts
+from acts.test_utils.wifi.aware import aware_test_utils as autils
+from acts.test_utils.wifi.aware.AwareBaseTest import AwareBaseTest
+from acts.test_utils.wifi.rtt.RttBaseTest import RttBaseTest
+
+
+class AwareDiscoveryWithRangingTest(AwareBaseTest, RttBaseTest):
+ """Set of tests for Wi-Fi Aware discovery configured with ranging (RTT)."""
+
+ SERVICE_NAME = "GoogleTestServiceRRRRR"
+
+ def __init__(self, controllers):
+ AwareBaseTest.__init__(self, controllers)
+ RttBaseTest.__init__(self, controllers)
+
+ def setup_test(self):
+ """Manual setup here due to multiple inheritance: explicitly execute the
+ setup method from both parents."""
+ AwareBaseTest.setup_test(self)
+ RttBaseTest.setup_test(self)
+
+ def teardown_test(self):
+ """Manual teardown here due to multiple inheritance: explicitly execute the
+ teardown method from both parents."""
+ AwareBaseTest.teardown_test(self)
+ RttBaseTest.teardown_test(self)
+
+ #########################################################################
+
+ def run_discovery(self, p_config, s_config, expect_discovery,
+ expect_range=False):
+ """Run discovery on the 2 input devices with the specified configurations.
+
+ Args:
+ p_config, s_config: Publisher and Subscriber discovery configuration.
+ expect_discovery: True or False indicating whether discovery is expected
+ with the specified configurations.
+ expect_range: True if we expect distance results (i.e. ranging to happen).
+ Only relevant if expect_discovery is True.
+ """
+ p_dut = self.android_devices[0]
+ p_dut.pretty_name = "Publisher"
+ s_dut = self.android_devices[1]
+ s_dut.pretty_name = "Subscriber"
+
+ # Publisher+Subscriber: attach and wait for confirmation
+ p_id = p_dut.droid.wifiAwareAttach(False)
+ autils.wait_for_event(p_dut, aconsts.EVENT_CB_ON_ATTACHED)
+ time.sleep(self.device_startup_offset)
+ s_id = s_dut.droid.wifiAwareAttach(False)
+ autils.wait_for_event(s_dut, aconsts.EVENT_CB_ON_ATTACHED)
+
+ # Publisher: start publish and wait for confirmation
+ p_disc_id = p_dut.droid.wifiAwarePublish(p_id, p_config)
+ autils.wait_for_event(p_dut, aconsts.SESSION_CB_ON_PUBLISH_STARTED)
+
+ # Subscriber: start subscribe and wait for confirmation
+ s_disc_id = s_dut.droid.wifiAwareSubscribe(s_id, s_config)
+ autils.wait_for_event(s_dut, aconsts.SESSION_CB_ON_SUBSCRIBE_STARTED)
+
+ # Subscriber: wait or fail on service discovery
+ if expect_discovery:
+ autils.wait_for_event(
+ s_dut,
+ aconsts.SESSION_CB_ON_SERVICE_DISCOVERED_WITHIN_RANGE if expect_range
+ else aconsts.SESSION_CB_ON_SERVICE_DISCOVERED)
+ else:
+ time.sleep(autils.EVENT_TIMEOUT) # single timeout for both events
+ autils.fail_on_event(s_dut, aconsts.SESSION_CB_ON_SERVICE_DISCOVERED,
+ timeout=0)
+ autils.fail_on_event(
+ s_dut,
+ aconsts.SESSION_CB_ON_SERVICE_DISCOVERED_WITHIN_RANGE,
+ timeout=0)
+
+ #########################################################################
+ # Run discovery with ranging configuration.
+ #
+ # Names: test_ranged_discovery_<ptype>_<stype>_<p_range>_<s_range>_<ref_dist>
+ #
+ # where:
+ # <ptype>_<stype>: unsolicited_passive or solicited_active
+ # <p_range>: prange or pnorange
+ # <s_range>: smin or smax or sminmax or snorange
+ # <ref_distance>: inrange or outoforange
+ #########################################################################
+
+ def test_ranged_discovery_unsolicited_passive_prange_snorange(self):
+ """Verify discovery with ranging:
+ - Unsolicited Publish/Passive Subscribe
+ - Publisher enables ranging
+ - Subscriber disables ranging
+
+ Expect: normal discovery (as if no ranging performed) - no distance
+ """
+ self.run_discovery(
+ p_config=autils.add_ranging_to_pub(
+ autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.PUBLISH_TYPE_UNSOLICITED),
+ enable_ranging=True),
+ s_config=autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.SUBSCRIBE_TYPE_PASSIVE),
+ expect_discovery=True,
+ expect_range=False)
+
+ def test_ranged_discovery_solicited_active_prange_snorange(self):
+ """Verify discovery with ranging:
+ - Solicited Publish/Active Subscribe
+ - Publisher enables ranging
+ - Subscriber disables ranging
+
+ Expect: normal discovery (as if no ranging performed) - no distance
+ """
+ self.run_discovery(
+ p_config=autils.add_ranging_to_pub(
+ autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.PUBLISH_TYPE_SOLICITED),
+ enable_ranging=True),
+ s_config=autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.SUBSCRIBE_TYPE_ACTIVE),
+ expect_discovery=True,
+ expect_range=False)
+
+ def test_ranged_discovery_unsolicited_passive_pnorange_smax_inrange(self):
+ """Verify discovery with ranging:
+ - Unsolicited Publish/Passive Subscribe
+ - Publisher disables ranging
+ - Subscriber enables ranging with max such that always within range (large
+ max)
+
+ Expect: no discovery
+ """
+ self.run_discovery(
+ p_config=autils.add_ranging_to_pub(
+ autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.PUBLISH_TYPE_UNSOLICITED),
+ enable_ranging=False),
+ s_config=autils.add_ranging_to_sub(
+ autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.SUBSCRIBE_TYPE_PASSIVE),
+ min_distance_mm=None,
+ max_distance_mm=1000000),
+ expect_discovery=False)
+
+ def test_ranged_discovery_solicited_active_pnorange_smax_inrange(self):
+ """Verify discovery with ranging:
+ - Solicited Publish/Active Subscribe
+ - Publisher disables ranging
+ - Subscriber enables ranging with max such that always within range (large
+ max)
+
+ Expect: no discovery
+ """
+ self.run_discovery(
+ p_config=autils.add_ranging_to_pub(
+ autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.PUBLISH_TYPE_SOLICITED),
+ enable_ranging=False),
+ s_config=autils.add_ranging_to_sub(
+ autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.SUBSCRIBE_TYPE_ACTIVE),
+ min_distance_mm=None,
+ max_distance_mm=1000000),
+ expect_discovery=False)
+
+ def test_ranged_discovery_unsolicited_passive_pnorange_smin_outofrange(self):
+ """Verify discovery with ranging:
+ - Unsolicited Publish/Passive Subscribe
+ - Publisher disables ranging
+ - Subscriber enables ranging with min such that always out of range (large
+ min)
+
+ Expect: no discovery
+ """
+ self.run_discovery(
+ p_config=autils.add_ranging_to_pub(
+ autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.PUBLISH_TYPE_UNSOLICITED),
+ enable_ranging=False),
+ s_config=autils.add_ranging_to_sub(
+ autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.SUBSCRIBE_TYPE_PASSIVE),
+ min_distance_mm=1000000,
+ max_distance_mm=None),
+ expect_discovery=False)
+
+ def test_ranged_discovery_solicited_active_pnorange_smin_outofrange(self):
+ """Verify discovery with ranging:
+ - Solicited Publish/Active Subscribe
+ - Publisher disables ranging
+ - Subscriber enables ranging with min such that always out of range (large
+ min)
+
+ Expect: no discovery
+ """
+ self.run_discovery(
+ p_config=autils.add_ranging_to_pub(
+ autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.PUBLISH_TYPE_SOLICITED),
+ enable_ranging=False),
+ s_config=autils.add_ranging_to_sub(
+ autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.SUBSCRIBE_TYPE_ACTIVE),
+ min_distance_mm=1000000,
+ max_distance_mm=None),
+ expect_discovery=False)
+
+ def test_ranged_discovery_unsolicited_passive_prange_smin_inrange(self):
+ """Verify discovery with ranging:
+ - Unsolicited Publish/Passive Subscribe
+ - Publisher enables ranging
+ - Subscriber enables ranging with min such that in range (min=0)
+
+ Expect: discovery with distance
+ """
+ self.run_discovery(
+ p_config=autils.add_ranging_to_pub(
+ autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.PUBLISH_TYPE_UNSOLICITED),
+ enable_ranging=True),
+ s_config=autils.add_ranging_to_sub(
+ autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.SUBSCRIBE_TYPE_PASSIVE),
+ min_distance_mm=0,
+ max_distance_mm=None),
+ expect_discovery=True,
+ expect_range=True)
+
+ def test_ranged_discovery_unsolicited_passive_prange_smax_inrange(self):
+ """Verify discovery with ranging:
+ - Unsolicited Publish/Passive Subscribe
+ - Publisher enables ranging
+ - Subscriber enables ranging with max such that in range (max=large)
+
+ Expect: discovery with distance
+ """
+ self.run_discovery(
+ p_config=autils.add_ranging_to_pub(
+ autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.PUBLISH_TYPE_UNSOLICITED),
+ enable_ranging=True),
+ s_config=autils.add_ranging_to_sub(
+ autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.SUBSCRIBE_TYPE_PASSIVE),
+ min_distance_mm=None,
+ max_distance_mm=1000000),
+ expect_discovery=True,
+ expect_range=True)
+
+ def test_ranged_discovery_unsolicited_passive_prange_sminmax_inrange(self):
+ """Verify discovery with ranging:
+ - Unsolicited Publish/Passive Subscribe
+ - Publisher enables ranging
+ - Subscriber enables ranging with min/max such that in range (min=0,
+ max=large)
+
+ Expect: discovery with distance
+ """
+ self.run_discovery(
+ p_config=autils.add_ranging_to_pub(
+ autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.PUBLISH_TYPE_UNSOLICITED),
+ enable_ranging=True),
+ s_config=autils.add_ranging_to_sub(
+ autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.SUBSCRIBE_TYPE_PASSIVE),
+ min_distance_mm=0,
+ max_distance_mm=1000000),
+ expect_discovery=True,
+ expect_range=True)
+
+ def test_ranged_discovery_solicited_active_prange_smin_inrange(self):
+ """Verify discovery with ranging:
+ - Solicited Publish/Active Subscribe
+ - Publisher enables ranging
+ - Subscriber enables ranging with min such that in range (min=0)
+
+ Expect: discovery with distance
+ """
+ self.run_discovery(
+ p_config=autils.add_ranging_to_pub(
+ autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.PUBLISH_TYPE_SOLICITED),
+ enable_ranging=True),
+ s_config=autils.add_ranging_to_sub(
+ autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.SUBSCRIBE_TYPE_ACTIVE),
+ min_distance_mm=0,
+ max_distance_mm=None),
+ expect_discovery=True,
+ expect_range=True)
+
+ def test_ranged_discovery_solicited_active_prange_smax_inrange(self):
+ """Verify discovery with ranging:
+ - Solicited Publish/Active Subscribe
+ - Publisher enables ranging
+ - Subscriber enables ranging with max such that in range (max=large)
+
+ Expect: discovery with distance
+ """
+ self.run_discovery(
+ p_config=autils.add_ranging_to_pub(
+ autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.PUBLISH_TYPE_SOLICITED),
+ enable_ranging=True),
+ s_config=autils.add_ranging_to_sub(
+ autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.SUBSCRIBE_TYPE_ACTIVE),
+ min_distance_mm=None,
+ max_distance_mm=1000000),
+ expect_discovery=True,
+ expect_range=True)
+
+ def test_ranged_discovery_solicited_active_prange_sminmax_inrange(self):
+ """Verify discovery with ranging:
+ - Solicited Publish/Active Subscribe
+ - Publisher enables ranging
+ - Subscriber enables ranging with min/max such that in range (min=0,
+ max=large)
+
+ Expect: discovery with distance
+ """
+ self.run_discovery(
+ p_config=autils.add_ranging_to_pub(
+ autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.PUBLISH_TYPE_SOLICITED),
+ enable_ranging=True),
+ s_config=autils.add_ranging_to_sub(
+ autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.SUBSCRIBE_TYPE_ACTIVE),
+ min_distance_mm=0,
+ max_distance_mm=1000000),
+ expect_discovery=True,
+ expect_range=True)
+
+ def test_ranged_discovery_unsolicited_passive_prange_smin_outofrange(self):
+ """Verify discovery with ranging:
+ - Unsolicited Publish/Passive Subscribe
+ - Publisher enables ranging
+ - Subscriber enables ranging with min such that out of range (min=large)
+
+ Expect: discovery with distance
+ """
+ self.run_discovery(
+ p_config=autils.add_ranging_to_pub(
+ autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.PUBLISH_TYPE_UNSOLICITED),
+ enable_ranging=True),
+ s_config=autils.add_ranging_to_sub(
+ autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.SUBSCRIBE_TYPE_PASSIVE),
+ min_distance_mm=1000000,
+ max_distance_mm=None),
+ expect_discovery=False)
+
+ def test_ranged_discovery_unsolicited_passive_prange_smax_outofrange(self):
+ """Verify discovery with ranging:
+ - Unsolicited Publish/Passive Subscribe
+ - Publisher enables ranging
+ - Subscriber enables ranging with max such that in range (max=0)
+
+ Expect: discovery with distance
+ """
+ self.run_discovery(
+ p_config=autils.add_ranging_to_pub(
+ autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.PUBLISH_TYPE_UNSOLICITED),
+ enable_ranging=True),
+ s_config=autils.add_ranging_to_sub(
+ autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.SUBSCRIBE_TYPE_PASSIVE),
+ min_distance_mm=None,
+ max_distance_mm=0),
+ expect_discovery=False)
+
+ def test_ranged_discovery_unsolicited_passive_prange_sminmax_outofrange(self):
+ """Verify discovery with ranging:
+ - Unsolicited Publish/Passive Subscribe
+ - Publisher enables ranging
+ - Subscriber enables ranging with min/max such that out of range (min=large,
+ max=large+1)
+
+ Expect: discovery with distance
+ """
+ self.run_discovery(
+ p_config=autils.add_ranging_to_pub(
+ autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.PUBLISH_TYPE_UNSOLICITED),
+ enable_ranging=True),
+ s_config=autils.add_ranging_to_sub(
+ autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.SUBSCRIBE_TYPE_PASSIVE),
+ min_distance_mm=1000000,
+ max_distance_mm=1000001),
+ expect_discovery=False)
+
+ def test_ranged_discovery_solicited_active_prange_smin_outofrange(self):
+ """Verify discovery with ranging:
+ - Solicited Publish/Active Subscribe
+ - Publisher enables ranging
+ - Subscriber enables ranging with min such that out of range (min=large)
+
+ Expect: discovery with distance
+ """
+ self.run_discovery(
+ p_config=autils.add_ranging_to_pub(
+ autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.PUBLISH_TYPE_SOLICITED),
+ enable_ranging=True),
+ s_config=autils.add_ranging_to_sub(
+ autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.SUBSCRIBE_TYPE_ACTIVE),
+ min_distance_mm=1000000,
+ max_distance_mm=None),
+ expect_discovery=False)
+
+ def test_ranged_discovery_solicited_active_prange_smax_outofrange(self):
+ """Verify discovery with ranging:
+ - Solicited Publish/Active Subscribe
+ - Publisher enables ranging
+ - Subscriber enables ranging with max such that out of range (max=0)
+
+ Expect: discovery with distance
+ """
+ self.run_discovery(
+ p_config=autils.add_ranging_to_pub(
+ autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.PUBLISH_TYPE_SOLICITED),
+ enable_ranging=True),
+ s_config=autils.add_ranging_to_sub(
+ autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.SUBSCRIBE_TYPE_ACTIVE),
+ min_distance_mm=None,
+ max_distance_mm=0),
+ expect_discovery=False)
+
+ def test_ranged_discovery_solicited_active_prange_sminmax_outofrange(self):
+ """Verify discovery with ranging:
+ - Solicited Publish/Active Subscribe
+ - Publisher enables ranging
+ - Subscriber enables ranging with min/max such that out of range (min=large,
+ max=large+1)
+
+ Expect: discovery with distance
+ """
+ self.run_discovery(
+ p_config=autils.add_ranging_to_pub(
+ autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.PUBLISH_TYPE_SOLICITED),
+ enable_ranging=True),
+ s_config=autils.add_ranging_to_sub(
+ autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.SUBSCRIBE_TYPE_ACTIVE),
+ min_distance_mm=1000000,
+ max_distance_mm=1000001),
+ expect_discovery=False)
\ No newline at end of file
diff --git a/acts/tests/google/wifi/rtt/functional/RangeApMiscTest.py b/acts/tests/google/wifi/rtt/functional/RangeApMiscTest.py
new file mode 100644
index 0000000..dd5560d
--- /dev/null
+++ b/acts/tests/google/wifi/rtt/functional/RangeApMiscTest.py
@@ -0,0 +1,85 @@
+#!/usr/bin/python3.4
+#
+# Copyright 2017 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from acts import asserts
+from acts.test_utils.wifi import wifi_test_utils as wutils
+from acts.test_utils.wifi.rtt import rtt_const as rconsts
+from acts.test_utils.wifi.rtt import rtt_test_utils as rutils
+from acts.test_utils.wifi.rtt.RttBaseTest import RttBaseTest
+
+
+class RangeApMiscTest(RttBaseTest):
+ """Test class for RTT ranging to Access Points - miscellaneous tests which
+ do not fit into the strict IEEE 802.11mc supporting or non-supporting test
+ beds - e.g. a mixed test."""
+
+ # Number of RTT iterations
+ NUM_ITER = 10
+
+ # Time gap (in seconds) between iterations
+ TIME_BETWEEN_ITERATIONS = 0
+
+ def __init__(self, controllers):
+ RttBaseTest.__init__(self, controllers)
+
+ #############################################################################
+
+ def test_rtt_mixed_80211mc_supporting_aps_wo_privilege(self):
+ """Scan for APs and perform RTT on one supporting and one non-supporting
+ IEEE 802.11mc APs with the device not having privilege access (expect
+ failures)."""
+ dut = self.android_devices[0]
+ rutils.config_privilege_override(dut, True)
+ rtt_aps = rutils.scan_with_rtt_support_constraint(dut, True)
+ non_rtt_aps = rutils.scan_with_rtt_support_constraint(dut, False)
+ mix_list = [rtt_aps[0], non_rtt_aps[0]]
+ dut.log.debug("Visible non-IEEE 802.11mc APs=%s", mix_list)
+ events = rutils.run_ranging(dut, mix_list, self.NUM_ITER,
+ self.TIME_BETWEEN_ITERATIONS)
+ stats = rutils.analyze_results(events, self.rtt_reference_distance_mm,
+ self.rtt_reference_distance_margin_mm,
+ self.rtt_min_expected_rssi_dbm,
+ self.lci_reference, self.lcr_reference)
+ dut.log.debug("Stats=%s", stats)
+
+ for bssid, stat in stats.items():
+ asserts.assert_true(stat['num_no_results'] == 0,
+ "Missing (timed-out) results", extras=stats)
+ if bssid == rtt_aps[0][wutils.WifiEnums.BSSID_KEY]:
+ asserts.assert_false(stat['any_lci_mismatch'],
+ "LCI mismatch", extras=stats)
+ asserts.assert_false(stat['any_lcr_mismatch'],
+ "LCR mismatch", extras=stats)
+ asserts.assert_equal(stat['num_invalid_rssi'], 0, "Invalid RSSI",
+ extras=stats)
+ asserts.assert_true(stat['num_failures'] <=
+ self.rtt_max_failure_rate_two_sided_rtt_percentage
+ * stat['num_results'] / 100,
+ "Failure rate is too high", extras=stats)
+ asserts.assert_true(stat['num_range_out_of_margin'] <=
+ self.rtt_max_margin_exceeded_rate_two_sided_rtt_percentage
+ * stat['num_success_results'] / 100,
+ "Results exceeding error margin rate is too high",
+ extras=stats)
+ else:
+ asserts.assert_true(stat['num_failures'] == self.NUM_ITER,
+ "All one-sided RTT requests must fail when executed without privilege",
+ extras=stats)
+ for code in stat['status_codes']:
+ asserts.assert_true(code ==
+ rconsts.EVENT_CB_RANGING_STATUS_RESPONDER_DOES_NOT_SUPPORT_IEEE80211MC,
+ "Expected non-support error code", extras=stats)
+ asserts.explicit_pass("RTT test done", extras=stats)
diff --git a/acts/tests/google/wifi/rtt/functional/RangeApNonSupporting11McTest.py b/acts/tests/google/wifi/rtt/functional/RangeApNonSupporting11McTest.py
new file mode 100644
index 0000000..6aa4479
--- /dev/null
+++ b/acts/tests/google/wifi/rtt/functional/RangeApNonSupporting11McTest.py
@@ -0,0 +1,125 @@
+#!/usr/bin/python3.4
+#
+# Copyright 2017 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import queue
+
+from acts import asserts
+from acts.test_utils.wifi import wifi_test_utils as wutils
+from acts.test_utils.wifi.rtt import rtt_const as rconsts
+from acts.test_utils.wifi.rtt import rtt_test_utils as rutils
+from acts.test_utils.wifi.rtt.RttBaseTest import RttBaseTest
+
+
+class RangeApNonSupporting11McTest(RttBaseTest):
+ """Test class for RTT ranging to Access Points which do not support IEEE
+ 802.11mc"""
+
+ # Number of RTT iterations
+ NUM_ITER = 10
+
+ # Time gap (in seconds) between iterations
+ TIME_BETWEEN_ITERATIONS = 0
+
+ def __init__(self, controllers):
+ RttBaseTest.__init__(self, controllers)
+
+ #############################################################################
+
+ def test_rtt_non_80211mc_supporting_aps(self):
+ """Scan for APs and perform RTT on non-IEEE 802.11mc supporting APs"""
+ dut = self.android_devices[0]
+ non_rtt_aps = rutils.scan_with_rtt_support_constraint(dut, False)
+ dut.log.debug("Visible non-IEEE 802.11mc APs=%s", non_rtt_aps)
+ events = rutils.run_ranging(dut, non_rtt_aps, self.NUM_ITER,
+ self.TIME_BETWEEN_ITERATIONS)
+ stats = rutils.analyze_results(events, self.rtt_reference_distance_mm,
+ self.rtt_reference_distance_margin_mm,
+ self.rtt_min_expected_rssi_dbm,
+ self.lci_reference, self.lcr_reference)
+ dut.log.debug("Stats=%s", stats)
+
+ for bssid, stat in stats.items():
+ asserts.assert_true(stat['num_no_results'] == 0,
+ "Missing (timed-out) results", extras=stats)
+ asserts.assert_false(stat['any_lci_mismatch'],
+ "LCI mismatch", extras=stats)
+ asserts.assert_false(stat['any_lcr_mismatch'],
+ "LCR mismatch", extras=stats)
+ asserts.assert_equal(stat['num_invalid_rssi'], 0, "Invalid RSSI",
+ extras=stats)
+ asserts.assert_true(stat['num_failures'] <=
+ self.rtt_max_failure_rate_one_sided_rtt_percentage
+ * stat['num_results'] / 100,
+ "Failure rate is too high", extras=stats)
+ asserts.assert_true(stat['num_range_out_of_margin'] <=
+ self.rtt_max_margin_exceeded_rate_one_sided_rtt_percentage
+ * stat['num_success_results'] / 100,
+ "Results exceeding error margin rate is too high",
+ extras=stats)
+ asserts.explicit_pass("RTT test done", extras=stats)
+
+ def test_rtt_non_80211mc_supporting_aps_wo_privilege(self):
+ """Scan for APs and perform RTT on non-IEEE 802.11mc supporting APs with the
+ device not having privilege access (expect failures)."""
+ dut = self.android_devices[0]
+ rutils.config_privilege_override(dut, True)
+ non_rtt_aps = rutils.scan_with_rtt_support_constraint(dut, False)
+ dut.log.debug("Visible non-IEEE 802.11mc APs=%s", non_rtt_aps)
+ events = rutils.run_ranging(dut, non_rtt_aps, self.NUM_ITER,
+ self.TIME_BETWEEN_ITERATIONS)
+ stats = rutils.analyze_results(events, self.rtt_reference_distance_mm,
+ self.rtt_reference_distance_margin_mm,
+ self.rtt_min_expected_rssi_dbm,
+ self.lci_reference, self.lcr_reference)
+ dut.log.debug("Stats=%s", stats)
+
+ for bssid, stat in stats.items():
+ asserts.assert_true(stat['num_no_results'] == 0,
+ "Missing (timed-out) results", extras=stats)
+ asserts.assert_true(stat['num_failures'] == self.NUM_ITER,
+ "All one-sided RTT requests must fail when executed without privilege",
+ extras=stats)
+ for code in stat['status_codes']:
+ asserts.assert_true(code ==
+ rconsts.EVENT_CB_RANGING_STATUS_RESPONDER_DOES_NOT_SUPPORT_IEEE80211MC,
+ "Expected non-support error code", extras=stats)
+ asserts.explicit_pass("RTT test done", extras=stats)
+
+ def test_rtt_non_80211mc_supporting_ap_faked_as_supporting(self):
+ """Scan for APs which do not support IEEE 802.11mc, maliciously modify the
+ Responder config to indicate support and pass-through to service. Verify
+ that get an error result.
+ """
+ dut = self.android_devices[0]
+ non_rtt_aps = rutils.scan_with_rtt_support_constraint(dut, False)
+ non_rtt_aps = non_rtt_aps[0:1] # pick first
+ non_rtt_aps[0][rconsts.SCAN_RESULT_KEY_RTT_RESPONDER] = True # falsify
+ dut.log.debug("Visible non-IEEE 802.11mc APs=%s", non_rtt_aps)
+ events = rutils.run_ranging(dut, non_rtt_aps, self.NUM_ITER,
+ self.TIME_BETWEEN_ITERATIONS)
+ stats = rutils.analyze_results(events, self.rtt_reference_distance_mm,
+ self.rtt_reference_distance_margin_mm,
+ self.rtt_min_expected_rssi_dbm,
+ self.lci_reference, self.lcr_reference)
+ dut.log.debug("Stats=%s", stats)
+
+ for bssid, stat in stats.items():
+ asserts.assert_true(stat['num_no_results'] == 0,
+ "Missing (timed-out) results", extras=stats)
+ asserts.assert_true(stat['num_failures'] == self.NUM_ITER,
+ "Failures expected for falsified responder config",
+ extras=stats)
+ asserts.explicit_pass("RTT test done", extras=stats)
diff --git a/acts/tests/google/wifi/rtt/functional/RangeApSupporting11McTest.py b/acts/tests/google/wifi/rtt/functional/RangeApSupporting11McTest.py
new file mode 100644
index 0000000..4eb08dd
--- /dev/null
+++ b/acts/tests/google/wifi/rtt/functional/RangeApSupporting11McTest.py
@@ -0,0 +1,167 @@
+#!/usr/bin/python3.4
+#
+# Copyright 2017 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import queue
+
+from acts import asserts
+from acts.test_utils.wifi import wifi_test_utils as wutils
+from acts.test_utils.wifi.rtt import rtt_const as rconsts
+from acts.test_utils.wifi.rtt import rtt_test_utils as rutils
+from acts.test_utils.wifi.rtt.RttBaseTest import RttBaseTest
+
+
+class RangeApSupporting11McTest(RttBaseTest):
+ """Test class for RTT ranging to Access Points which support IEEE 802.11mc"""
+
+ # Number of RTT iterations
+ NUM_ITER = 10
+
+ # Time gap (in seconds) between iterations
+ TIME_BETWEEN_ITERATIONS = 0
+
+ def __init__(self, controllers):
+ RttBaseTest.__init__(self, controllers)
+
+ #############################################################################
+
+ def test_rtt_80211mc_supporting_aps(self):
+ """Scan for APs and perform RTT only to those which support 802.11mc"""
+ dut = self.android_devices[0]
+ rtt_supporting_aps = rutils.scan_with_rtt_support_constraint(dut, True,
+ repeat=10)
+ dut.log.debug("RTT Supporting APs=%s", rtt_supporting_aps)
+ events = rutils.run_ranging(dut, rtt_supporting_aps, self.NUM_ITER,
+ self.TIME_BETWEEN_ITERATIONS)
+ stats = rutils.analyze_results(events, self.rtt_reference_distance_mm,
+ self.rtt_reference_distance_margin_mm,
+ self.rtt_min_expected_rssi_dbm,
+ self.lci_reference, self.lcr_reference)
+ dut.log.debug("Stats=%s", stats)
+
+ for bssid, stat in stats.items():
+ asserts.assert_true(stat['num_no_results'] == 0,
+ "Missing (timed-out) results", extras=stats)
+ asserts.assert_false(stat['any_lci_mismatch'],
+ "LCI mismatch", extras=stats)
+ asserts.assert_false(stat['any_lcr_mismatch'],
+ "LCR mismatch", extras=stats)
+ asserts.assert_equal(stat['num_invalid_rssi'], 0, "Invalid RSSI",
+ extras=stats)
+ asserts.assert_true(stat['num_failures'] <=
+ self.rtt_max_failure_rate_two_sided_rtt_percentage
+ * stat['num_results'] / 100,
+ "Failure rate is too high", extras=stats)
+ asserts.assert_true(stat['num_range_out_of_margin'] <=
+ self.rtt_max_margin_exceeded_rate_two_sided_rtt_percentage
+ * stat['num_success_results'] / 100,
+ "Results exceeding error margin rate is too high", extras=stats)
+ asserts.explicit_pass("RTT test done", extras=stats)
+
+ #########################################################################
+ #
+ # LEGACY API test code
+ #
+ #########################################################################
+
+ def test_legacy_rtt_80211mc_supporting_aps(self):
+ """Scan for APs and perform RTT only to those which support 802.11mc - using
+ the LEGACY API!"""
+ dut = self.android_devices[0]
+ rtt_supporting_aps = rutils.scan_with_rtt_support_constraint(dut, True,
+ repeat=10)
+ dut.log.debug("RTT Supporting APs=%s", rtt_supporting_aps)
+
+ rtt_configs = []
+ for ap in rtt_supporting_aps:
+ rtt_configs.append(self.rtt_config_from_scan_result(ap))
+ dut.log.debug("RTT configs=%s", rtt_configs)
+
+ results = []
+ num_missing = 0
+ for i in range(self.NUM_ITER):
+ idx = dut.droid.wifiRttStartRanging(rtt_configs)
+ event = None
+ try:
+ events = dut.ed.pop_events("WifiRttRanging%d" % idx, 30)
+ dut.log.debug("Event=%s", events)
+ for event in events:
+ results.append(event["data"][rconsts.EVENT_CB_RANGING_KEY_RESULTS])
+ except queue.Empty:
+ self.log.debug("Waiting for RTT event timed out.")
+ results.append([])
+ num_missing = num_missing + 1
+
+ # basic error checking:
+ # 1. no missing
+ # 2. overall (all BSSIDs) success rate > threshold
+ asserts.assert_equal(num_missing, 0,
+ "Missing results (timeout waiting for event)",
+ extras=results)
+
+ num_results = 0
+ num_errors = 0
+ for result_group in results:
+ num_results = num_results + len(result_group)
+ for result in result_group:
+ if result["status"] != 0:
+ num_errors = num_errors + 1
+
+ extras = [results, {"num_results": num_results, "num_errors": num_errors}]
+ asserts.assert_true(
+ num_errors <= self.rtt_max_failure_rate_two_sided_rtt_percentage
+ * num_results / 100,
+ "Failure rate is too high", extras=extras)
+ asserts.explicit_pass("RTT test done", extras=extras)
+
+ def rtt_config_from_scan_result(self, scan_result):
+ """Creates an Rtt configuration based on the scan result of a network.
+ """
+ WifiEnums = wutils.WifiEnums
+ ScanResult = WifiEnums.ScanResult
+ RttParam = WifiEnums.RttParam
+ RttBW = WifiEnums.RttBW
+ RttPreamble = WifiEnums.RttPreamble
+ RttType = WifiEnums.RttType
+
+ scan_result_channel_width_to_rtt = {
+ ScanResult.CHANNEL_WIDTH_20MHZ: RttBW.BW_20_SUPPORT,
+ ScanResult.CHANNEL_WIDTH_40MHZ: RttBW.BW_40_SUPPORT,
+ ScanResult.CHANNEL_WIDTH_80MHZ: RttBW.BW_80_SUPPORT,
+ ScanResult.CHANNEL_WIDTH_160MHZ: RttBW.BW_160_SUPPORT,
+ ScanResult.CHANNEL_WIDTH_80MHZ_PLUS_MHZ: RttBW.BW_160_SUPPORT
+ }
+ p = {}
+ freq = scan_result[RttParam.frequency]
+ p[RttParam.frequency] = freq
+ p[RttParam.BSSID] = scan_result[WifiEnums.BSSID_KEY]
+ if freq > 5000:
+ p[RttParam.preamble] = RttPreamble.PREAMBLE_VHT
+ else:
+ p[RttParam.preamble] = RttPreamble.PREAMBLE_HT
+ cf0 = scan_result[RttParam.center_freq0]
+ if cf0 > 0:
+ p[RttParam.center_freq0] = cf0
+ cf1 = scan_result[RttParam.center_freq1]
+ if cf1 > 0:
+ p[RttParam.center_freq1] = cf1
+ cw = scan_result["channelWidth"]
+ p[RttParam.channel_width] = cw
+ p[RttParam.bandwidth] = scan_result_channel_width_to_rtt[cw]
+ if scan_result["is80211McRTTResponder"]:
+ p[RttParam.request_type] = RttType.TYPE_TWO_SIDED
+ else:
+ p[RttParam.request_type] = RttType.TYPE_ONE_SIDED
+ return p
diff --git a/acts/tests/google/wifi/rtt/functional/RangeAwareTest.py b/acts/tests/google/wifi/rtt/functional/RangeAwareTest.py
new file mode 100644
index 0000000..819454a
--- /dev/null
+++ b/acts/tests/google/wifi/rtt/functional/RangeAwareTest.py
@@ -0,0 +1,397 @@
+#!/usr/bin/python3.4
+#
+# Copyright 2017 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import queue
+import time
+
+from acts import asserts
+from acts.test_utils.wifi.aware import aware_const as aconsts
+from acts.test_utils.wifi.aware import aware_test_utils as autils
+from acts.test_utils.wifi.aware.AwareBaseTest import AwareBaseTest
+from acts.test_utils.wifi.rtt import rtt_const as rconsts
+from acts.test_utils.wifi.rtt import rtt_test_utils as rutils
+from acts.test_utils.wifi.rtt.RttBaseTest import RttBaseTest
+
+
+class RangeAwareTest(AwareBaseTest, RttBaseTest):
+ """Test class for RTT ranging to Wi-Fi Aware peers"""
+ SERVICE_NAME = "GoogleTestServiceXY"
+
+ # Number of RTT iterations
+ NUM_ITER = 10
+
+ # Time gap (in seconds) between iterations
+ TIME_BETWEEN_ITERATIONS = 0
+
+ # Time gap (in seconds) when switching between Initiator and Responder
+ TIME_BETWEEN_ROLES = 0
+
+ def __init__(self, controllers):
+ AwareBaseTest.__init__(self, controllers)
+ RttBaseTest.__init__(self, controllers)
+
+ def setup_test(self):
+ """Manual setup here due to multiple inheritance: explicitly execute the
+ setup method from both parents."""
+ AwareBaseTest.setup_test(self)
+ RttBaseTest.setup_test(self)
+
+ def teardown_test(self):
+ """Manual teardown here due to multiple inheritance: explicitly execute the
+ teardown method from both parents."""
+ AwareBaseTest.teardown_test(self)
+ RttBaseTest.teardown_test(self)
+
+ #############################################################################
+
+ def run_rtt_discovery(self, init_dut, resp_mac=None, resp_peer_id=None):
+ """Perform single RTT measurement, using Aware, from the Initiator DUT to
+ a Responder. The RTT Responder can be specified using its MAC address
+ (obtained using out- of-band discovery) or its Peer ID (using Aware
+ discovery).
+
+ Args:
+ init_dut: RTT Initiator device
+ resp_mac: MAC address of the RTT Responder device
+ resp_peer_id: Peer ID of the RTT Responder device
+ """
+ asserts.assert_true(resp_mac is not None or resp_peer_id is not None,
+ "One of the Responder specifications (MAC or Peer ID)"
+ " must be provided!")
+ if resp_mac is not None:
+ id = init_dut.droid.wifiRttStartRangingToAwarePeerMac(resp_mac)
+ else:
+ id = init_dut.droid.wifiRttStartRangingToAwarePeerId(resp_peer_id)
+ try:
+ event = init_dut.ed.pop_event(rutils.decorate_event(
+ rconsts.EVENT_CB_RANGING_ON_RESULT, id), rutils.EVENT_TIMEOUT)
+ result = event["data"][rconsts.EVENT_CB_RANGING_KEY_RESULTS][0]
+ if resp_mac is not None:
+ rutils.validate_aware_mac_result(result, resp_mac, "DUT")
+ else:
+ rutils.validate_aware_peer_id_result(result, resp_peer_id, "DUT")
+ return result
+ except queue.Empty:
+ return None
+
+ def run_rtt_ib_discovery_set(self, do_both_directions, iter_count,
+ time_between_iterations, time_between_roles):
+ """Perform a set of RTT measurements, using in-band (Aware) discovery.
+
+ Args:
+ do_both_directions: False - perform all measurements in one direction,
+ True - perform 2 measurements one in both directions.
+ iter_count: Number of measurements to perform.
+ time_between_iterations: Number of seconds to wait between iterations.
+ time_between_roles: Number of seconds to wait when switching between
+ Initiator and Responder roles (only matters if
+ do_both_directions=True).
+
+ Returns: a list of the events containing the RTT results (or None for a
+ failed measurement). If both directions are tested then returns a list of
+ 2 elements: one set for each direction.
+ """
+ p_dut = self.android_devices[0]
+ s_dut = self.android_devices[1]
+
+ (p_id, s_id, p_disc_id, s_disc_id,
+ peer_id_on_sub, peer_id_on_pub) = autils.create_discovery_pair(
+ p_dut,
+ s_dut,
+ p_config=autils.add_ranging_to_pub(autils.create_discovery_config(
+ self.SERVICE_NAME, aconsts.PUBLISH_TYPE_UNSOLICITED), True),
+ s_config=autils.add_ranging_to_pub(autils.create_discovery_config(
+ self.SERVICE_NAME, aconsts.SUBSCRIBE_TYPE_PASSIVE), True),
+ device_startup_offset=self.device_startup_offset,
+ msg_id=self.get_next_msg_id())
+
+ resultsPS = []
+ resultsSP = []
+ for i in range(iter_count):
+ if i != 0 and time_between_iterations != 0:
+ time.sleep(time_between_iterations)
+
+ # perform RTT from pub -> sub
+ resultsPS.append(
+ self.run_rtt_discovery(p_dut, resp_peer_id=peer_id_on_pub))
+
+ if do_both_directions:
+ if time_between_roles != 0:
+ time.sleep(time_between_roles)
+
+ # perform RTT from sub -> pub
+ resultsSP.append(
+ self.run_rtt_discovery(s_dut, resp_peer_id=peer_id_on_sub))
+
+ return resultsPS if not do_both_directions else [resultsPS, resultsSP]
+
+ def run_rtt_oob_discovery_set(self, do_both_directions, iter_count,
+ time_between_iterations, time_between_roles):
+ """Perform a set of RTT measurements, using out-of-band discovery.
+
+ Args:
+ do_both_directions: False - perform all measurements in one direction,
+ True - perform 2 measurements one in both directions.
+ iter_count: Number of measurements to perform.
+ time_between_iterations: Number of seconds to wait between iterations.
+ time_between_roles: Number of seconds to wait when switching between
+ Initiator and Responder roles (only matters if
+ do_both_directions=True).
+ enable_ranging: True to enable Ranging, False to disable.
+
+ Returns: a list of the events containing the RTT results (or None for a
+ failed measurement). If both directions are tested then returns a list of
+ 2 elements: one set for each direction.
+ """
+ dut0 = self.android_devices[0]
+ dut1 = self.android_devices[1]
+
+ id0, mac0 = autils.attach_with_identity(dut0)
+ id1, mac1 = autils.attach_with_identity(dut1)
+
+ # wait for for devices to synchronize with each other - there are no other
+ # mechanisms to make sure this happens for OOB discovery (except retrying
+ # to execute the data-path request)
+ time.sleep(autils.WAIT_FOR_CLUSTER)
+
+ # start publisher(s) on the Responder(s) with ranging enabled
+ p_config = autils.add_ranging_to_pub(
+ autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.PUBLISH_TYPE_UNSOLICITED),
+ enable_ranging=True)
+ dut1.droid.wifiAwarePublish(id1, p_config)
+ autils.wait_for_event(dut1, aconsts.SESSION_CB_ON_PUBLISH_STARTED)
+ if do_both_directions:
+ dut0.droid.wifiAwarePublish(id0, p_config)
+ autils.wait_for_event(dut0, aconsts.SESSION_CB_ON_PUBLISH_STARTED)
+
+ results01 = []
+ results10 = []
+ for i in range(iter_count):
+ if i != 0 and time_between_iterations != 0:
+ time.sleep(time_between_iterations)
+
+ # perform RTT from dut0 -> dut1
+ results01.append(
+ self.run_rtt_discovery(dut0, resp_mac=mac1))
+
+ if do_both_directions:
+ if time_between_roles != 0:
+ time.sleep(time_between_roles)
+
+ # perform RTT from dut1 -> dut0
+ results10.append(
+ self.run_rtt_discovery(dut1, resp_mac=mac0))
+
+ return results01 if not do_both_directions else [results01, results10]
+
+ def verify_results(self, results, results_reverse_direction=None):
+ """Verifies the results of the RTT experiment.
+
+ Args:
+ results: List of RTT results.
+ results_reverse_direction: List of RTT results executed in the
+ reverse direction. Optional.
+ """
+ stats = rutils.extract_stats(results, self.rtt_reference_distance_mm,
+ self.rtt_reference_distance_margin_mm,
+ self.rtt_min_expected_rssi_dbm)
+ stats_reverse_direction = None
+ if results_reverse_direction is not None:
+ stats_reverse_direction = rutils.extract_stats(results_reverse_direction,
+ self.rtt_reference_distance_mm, self.rtt_reference_distance_margin_mm,
+ self.rtt_min_expected_rssi_dbm)
+ self.log.debug("Stats: %s", stats)
+ if stats_reverse_direction is not None:
+ self.log.debug("Stats in reverse direction: %s", stats_reverse_direction)
+
+ extras = stats if stats_reverse_direction is None else [stats,
+ stats_reverse_direction]
+
+ asserts.assert_true(stats['num_no_results'] == 0,
+ "Missing (timed-out) results", extras=extras)
+ asserts.assert_false(stats['any_lci_mismatch'],
+ "LCI mismatch", extras=extras)
+ asserts.assert_false(stats['any_lcr_mismatch'],
+ "LCR mismatch", extras=extras)
+ asserts.assert_equal(stats['num_invalid_rssi'], 0, "Invalid RSSI",
+ extras=extras)
+ asserts.assert_true(
+ stats['num_failures'] <=
+ self.rtt_max_failure_rate_two_sided_rtt_percentage
+ * stats['num_results'] / 100,
+ "Failure rate is too high", extras=extras)
+ asserts.assert_true(
+ stats['num_range_out_of_margin']
+ <= self.rtt_max_margin_exceeded_rate_two_sided_rtt_percentage
+ * stats['num_success_results'] / 100,
+ "Results exceeding error margin rate is too high", extras=extras)
+
+ if stats_reverse_direction is not None:
+ asserts.assert_true(stats_reverse_direction['num_no_results'] == 0,
+ "Missing (timed-out) results",
+ extras=extras)
+ asserts.assert_false(stats['any_lci_mismatch'],
+ "LCI mismatch", extras=extras)
+ asserts.assert_false(stats['any_lcr_mismatch'],
+ "LCR mismatch", extras=extras)
+ asserts.assert_equal(stats['num_invalid_rssi'], 0, "Invalid RSSI",
+ extras=extras)
+ asserts.assert_true(
+ stats_reverse_direction['num_failures']
+ <= self.rtt_max_failure_rate_two_sided_rtt_percentage
+ * stats['num_results'] / 100,
+ "Failure rate is too high", extras=extras)
+ asserts.assert_true(
+ stats_reverse_direction['num_range_out_of_margin']
+ <= self.rtt_max_margin_exceeded_rate_two_sided_rtt_percentage
+ * stats['num_success_results'] / 100,
+ "Results exceeding error margin rate is too high",
+ extras=extras)
+
+ asserts.explicit_pass("RTT Aware test done", extras=extras)
+
+ #############################################################################
+
+ def test_rtt_oob_discovery_one_way(self):
+ """Perform RTT between 2 Wi-Fi Aware devices. Use out-of-band discovery
+ to communicate the MAC addresses to the peer. Test one-direction RTT only.
+ """
+ rtt_results = self.run_rtt_oob_discovery_set(do_both_directions=False,
+ iter_count=self.NUM_ITER,
+ time_between_iterations=self.TIME_BETWEEN_ITERATIONS,
+ time_between_roles=self.TIME_BETWEEN_ROLES)
+ self.verify_results(rtt_results)
+
+ def test_rtt_oob_discovery_both_ways(self):
+ """Perform RTT between 2 Wi-Fi Aware devices. Use out-of-band discovery
+ to communicate the MAC addresses to the peer. Test RTT both-ways:
+ switching rapidly between Initiator and Responder.
+ """
+ rtt_results1, rtt_results2 = self.run_rtt_oob_discovery_set(
+ do_both_directions=True, iter_count=self.NUM_ITER,
+ time_between_iterations=self.TIME_BETWEEN_ITERATIONS,
+ time_between_roles=self.TIME_BETWEEN_ROLES)
+ self.verify_results(rtt_results1, rtt_results2)
+
+ def test_rtt_ib_discovery_one_way(self):
+ """Perform RTT between 2 Wi-Fi Aware devices. Use in-band (Aware) discovery
+ to communicate the MAC addresses to the peer. Test one-direction RTT only.
+ """
+ rtt_results = self.run_rtt_ib_discovery_set(do_both_directions=False,
+ iter_count=self.NUM_ITER,
+ time_between_iterations=self.TIME_BETWEEN_ITERATIONS,
+ time_between_roles=self.TIME_BETWEEN_ROLES)
+ self.verify_results(rtt_results)
+
+ def test_rtt_ib_discovery_both_ways(self):
+ """Perform RTT between 2 Wi-Fi Aware devices. Use in-band (Aware) discovery
+ to communicate the MAC addresses to the peer. Test RTT both-ways:
+ switching rapidly between Initiator and Responder.
+ """
+ rtt_results1, rtt_results2 = self.run_rtt_ib_discovery_set(
+ do_both_directions=True, iter_count=self.NUM_ITER,
+ time_between_iterations=self.TIME_BETWEEN_ITERATIONS,
+ time_between_roles=self.TIME_BETWEEN_ROLES)
+ self.verify_results(rtt_results1, rtt_results2)
+
+ def test_rtt_without_initiator_aware(self):
+ """Try to perform RTT operation when there is no local Aware session (on the
+ Initiator). The Responder is configured normally: Aware on and a Publisher
+ with Ranging enable. Should FAIL."""
+ init_dut = self.android_devices[0]
+ resp_dut = self.android_devices[1]
+
+ # Enable a Responder and start a Publisher
+ resp_id = resp_dut.droid.wifiAwareAttach(True)
+ autils.wait_for_event(resp_dut, aconsts.EVENT_CB_ON_ATTACHED)
+ resp_ident_event = autils.wait_for_event(resp_dut,
+ aconsts.EVENT_CB_ON_IDENTITY_CHANGED)
+ resp_mac = resp_ident_event['data']['mac']
+
+ resp_config = autils.add_ranging_to_pub(
+ autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.PUBLISH_TYPE_UNSOLICITED),
+ enable_ranging=True)
+ resp_dut.droid.wifiAwarePublish(resp_id, resp_config)
+ autils.wait_for_event(resp_dut, aconsts.SESSION_CB_ON_PUBLISH_STARTED)
+
+ # Initiate an RTT to Responder (no Aware started on Initiator!)
+ results = []
+ num_no_responses = 0
+ num_successes = 0
+ for i in range(self.NUM_ITER):
+ result = self.run_rtt_discovery(init_dut, resp_mac=resp_mac)
+ self.log.debug("result: %s", result)
+ results.append(result)
+ if result is None:
+ num_no_responses = num_no_responses + 1
+ elif (result[rconsts.EVENT_CB_RANGING_KEY_STATUS]
+ == rconsts.EVENT_CB_RANGING_STATUS_SUCCESS):
+ num_successes = num_successes + 1
+
+ asserts.assert_equal(num_no_responses, 0, "No RTT response?",
+ extras=results)
+ asserts.assert_equal(num_successes, 0, "Aware RTT w/o Aware should FAIL!",
+ extras=results)
+ asserts.explicit_pass("RTT Aware test done", extras=results)
+
+ def test_rtt_without_responder_aware(self):
+ """Try to perform RTT operation when there is no peer Aware session (on the
+ Responder). Should FAIL."""
+ init_dut = self.android_devices[0]
+ resp_dut = self.android_devices[1]
+
+ # Enable a Responder and start a Publisher
+ resp_id = resp_dut.droid.wifiAwareAttach(True)
+ autils.wait_for_event(resp_dut, aconsts.EVENT_CB_ON_ATTACHED)
+ resp_ident_event = autils.wait_for_event(resp_dut,
+ aconsts.EVENT_CB_ON_IDENTITY_CHANGED)
+ resp_mac = resp_ident_event['data']['mac']
+
+ resp_config = autils.add_ranging_to_pub(
+ autils.create_discovery_config(self.SERVICE_NAME,
+ aconsts.PUBLISH_TYPE_UNSOLICITED),
+ enable_ranging=True)
+ resp_dut.droid.wifiAwarePublish(resp_id, resp_config)
+ autils.wait_for_event(resp_dut, aconsts.SESSION_CB_ON_PUBLISH_STARTED)
+
+ # Disable Responder
+ resp_dut.droid.wifiAwareDestroy(resp_id)
+
+ # Enable the Initiator
+ init_id = init_dut.droid.wifiAwareAttach()
+ autils.wait_for_event(init_dut, aconsts.EVENT_CB_ON_ATTACHED)
+
+ # Initiate an RTT to Responder (no Aware started on Initiator!)
+ results = []
+ num_no_responses = 0
+ num_successes = 0
+ for i in range(self.NUM_ITER):
+ result = self.run_rtt_discovery(init_dut, resp_mac=resp_mac)
+ self.log.debug("result: %s", result)
+ results.append(result)
+ if result is None:
+ num_no_responses = num_no_responses + 1
+ elif (result[rconsts.EVENT_CB_RANGING_KEY_STATUS]
+ == rconsts.EVENT_CB_RANGING_STATUS_SUCCESS):
+ num_successes = num_successes + 1
+
+ asserts.assert_equal(num_no_responses, 0, "No RTT response?",
+ extras=results)
+ asserts.assert_equal(num_successes, 0, "Aware RTT w/o Aware should FAIL!",
+ extras=results)
+ asserts.explicit_pass("RTT Aware test done", extras=results)
diff --git a/acts/tests/google/wifi/rtt/functional/RttDisableTest.py b/acts/tests/google/wifi/rtt/functional/RttDisableTest.py
new file mode 100644
index 0000000..d8dd1a1
--- /dev/null
+++ b/acts/tests/google/wifi/rtt/functional/RttDisableTest.py
@@ -0,0 +1,100 @@
+#!/usr/bin/python3.4
+#
+# Copyright 2017 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from acts import asserts
+from acts import utils
+from acts.test_utils.wifi.rtt import rtt_const as rconsts
+from acts.test_utils.wifi.rtt import rtt_test_utils as rutils
+from acts.test_utils.wifi.rtt.RttBaseTest import RttBaseTest
+
+
+class RttDisableTest(RttBaseTest):
+ """Test class for RTT ranging enable/disable flows."""
+
+ MODE_DISABLE_WIFI = 0
+ MODE_ENABLE_DOZE = 1
+ MODE_DISABLE_LOCATIONING = 2
+
+ def __init__(self, controllers):
+ RttBaseTest.__init__(self, controllers)
+
+ def run_disable_rtt(self, disable_mode):
+ """Validate the RTT disabled flows: whether by disabling Wi-Fi or entering
+ doze mode.
+
+ Args:
+ disable_mode: The particular mechanism in which RTT is disabled. One of
+ the MODE_* constants.
+ """
+ dut = self.android_devices[0]
+
+ # validate start-up conditions
+ asserts.assert_true(dut.droid.wifiIsRttAvailable(), "RTT is not available")
+
+ # scan to get some APs to be used later
+ all_aps = rutils.scan_networks(dut)
+ asserts.assert_true(len(all_aps) > 0, "Need at least one visible AP!")
+
+ # disable RTT and validate broadcast & API
+ if disable_mode == self.MODE_DISABLE_WIFI:
+ # disabling Wi-Fi is not sufficient: since scan mode (and hence RTT) will
+ # remain enabled - we need to disable the Wi-Fi chip aka Airplane Mode
+ asserts.assert_true(utils.force_airplane_mode(dut, True),
+ "Can not turn on airplane mode on: %s" % dut.serial)
+ elif disable_mode == self.MODE_ENABLE_DOZE:
+ asserts.assert_true(utils.enable_doze(dut), "Can't enable doze")
+ elif disable_mode == self.MODE_DISABLE_LOCATIONING:
+ utils.set_location_service(dut, False)
+
+ rutils.wait_for_event(dut, rconsts.BROADCAST_WIFI_RTT_NOT_AVAILABLE)
+ asserts.assert_false(dut.droid.wifiIsRttAvailable(), "RTT is available")
+
+ # request a range and validate error
+ id = dut.droid.wifiRttStartRangingToAccessPoints(all_aps[0:1])
+ event = rutils.wait_for_event(dut, rutils.decorate_event(
+ rconsts.EVENT_CB_RANGING_ON_FAIL, id))
+ asserts.assert_equal(event["data"][rconsts.EVENT_CB_RANGING_KEY_STATUS],
+ rconsts.RANGING_FAIL_CODE_RTT_NOT_AVAILABLE,
+ "Invalid error code")
+
+ # enable RTT and validate broadcast & API
+ if disable_mode == self.MODE_DISABLE_WIFI:
+ asserts.assert_true(utils.force_airplane_mode(dut, False),
+ "Can not turn off airplane mode on: %s" % dut.serial)
+ elif disable_mode == self.MODE_ENABLE_DOZE:
+ asserts.assert_true(utils.disable_doze(dut), "Can't disable doze")
+ elif disable_mode == self.MODE_DISABLE_LOCATIONING:
+ utils.set_location_service(dut, True)
+
+ rutils.wait_for_event(dut, rconsts.BROADCAST_WIFI_RTT_AVAILABLE)
+ asserts.assert_true(dut.droid.wifiIsRttAvailable(), "RTT is not available")
+
+ ############################################################################
+
+ def test_disable_wifi(self):
+ """Validate that getting expected broadcast when Wi-Fi is disabled and that
+ any range requests are rejected."""
+ self.run_disable_rtt(self.MODE_DISABLE_WIFI)
+
+ def test_enable_doze(self):
+ """Validate that getting expected broadcast when RTT is disabled due to doze
+ mode and that any range requests are rejected."""
+ self.run_disable_rtt(self.MODE_ENABLE_DOZE)
+
+ def test_disable_location(self):
+ """Validate that getting expected broadcast when locationing is disabled and
+ that any range requests are rejected."""
+ self.run_disable_rtt(self.MODE_DISABLE_LOCATIONING)
diff --git a/acts/tests/google/wifi/rtt/functional/RttRequestManagementTest.py b/acts/tests/google/wifi/rtt/functional/RttRequestManagementTest.py
new file mode 100644
index 0000000..fe60552
--- /dev/null
+++ b/acts/tests/google/wifi/rtt/functional/RttRequestManagementTest.py
@@ -0,0 +1,133 @@
+#!/usr/bin/python3.4
+#
+# Copyright 2017 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import random
+import time
+
+from acts import asserts
+from acts.test_utils.wifi.rtt import rtt_const as rconsts
+from acts.test_utils.wifi.rtt import rtt_test_utils as rutils
+from acts.test_utils.wifi.rtt.RttBaseTest import RttBaseTest
+
+
+class RttRequestManagementTest(RttBaseTest):
+ """Test class for RTT request management flows."""
+
+ SPAMMING_LIMIT = 20
+
+ def __init__(self, controllers):
+ RttBaseTest.__init__(self, controllers)
+
+ #############################################################################
+
+ def test_cancel_ranging(self):
+ """Request a 'large' number of range operations with various UIDs (using the
+ work-source API), then cancel some of them.
+
+ We can't guarantee a reaction time - it is possible that a cancelled test
+ was already finished and it's results dispatched back. The test therefore
+ stacks the request queue. The sequence is:
+
+ - Request:
+ - 50 tests @ UIDs = {uid1, uid2, uid3}
+ - 2 tests @ UIDs = {uid2, uid3}
+ - 1 test2 @ UIDs = {uid1, uid2, uid3}
+ - Cancel UIDs = {uid2, uid3}
+
+ Expect to receive only 51 results.
+ """
+ dut = self.android_devices[0]
+ max_peers = dut.droid.wifiRttMaxPeersInRequest()
+
+ all_uids = [1000, 20, 30] # 1000 = System Server (makes requests foreground)
+ some_uids = [20, 30]
+
+ aps = rutils.scan_with_rtt_support_constraint(dut, True, repeat=10)
+ dut.log.info("RTT Supporting APs=%s", aps)
+
+ asserts.assert_true(
+ len(aps) > 0,
+ "Need at least one AP which supports 802.11mc!")
+ if len(aps) > max_peers:
+ aps = aps[0:max_peers]
+
+ group1_ids = []
+ group2_ids = []
+ group3_ids = []
+
+ # step 1: request <spam_limit> ranging operations on [uid1, uid2, uid3]
+ for i in range(self.SPAMMING_LIMIT):
+ group1_ids.append(
+ dut.droid.wifiRttStartRangingToAccessPoints(aps, all_uids))
+
+ # step 2: request 2 ranging operations on [uid2, uid3]
+ for i in range(2):
+ group2_ids.append(
+ dut.droid.wifiRttStartRangingToAccessPoints(aps, some_uids))
+
+ # step 3: request 1 ranging operation on [uid1, uid2, uid3]
+ for i in range(1):
+ group3_ids.append(
+ dut.droid.wifiRttStartRangingToAccessPoints(aps, all_uids))
+
+ # step 4: cancel ranging requests on [uid2, uid3]
+ dut.droid.wifiRttCancelRanging(some_uids)
+
+ # collect results
+ for i in range(len(group1_ids)):
+ rutils.wait_for_event(dut, rutils.decorate_event(
+ rconsts.EVENT_CB_RANGING_ON_RESULT, group1_ids[i]))
+ time.sleep(rutils.EVENT_TIMEOUT) # optimize time-outs below to single one
+ for i in range(len(group2_ids)):
+ rutils.fail_on_event(dut, rutils.decorate_event(
+ rconsts.EVENT_CB_RANGING_ON_RESULT, group2_ids[i]), 0)
+ for i in range(len(group3_ids)):
+ rutils.wait_for_event(dut, rutils.decorate_event(
+ rconsts.EVENT_CB_RANGING_ON_RESULT, group3_ids[i]))
+
+ def test_throttling(self):
+ """Request sequential range operations using a bogus UID (which will
+ translate as a throttled process) and similarly using the ACTS/sl4a as
+ the source (a foreground/unthrottled process)."""
+ dut = self.android_devices[0]
+ max_peers = dut.droid.wifiRttMaxPeersInRequest()
+
+ # Need to use a random number since the system keeps states and so the
+ # background uid will be throttled on the next run of this script
+ fake_uid = [random.randint(10, 9999)]
+
+ aps = rutils.scan_with_rtt_support_constraint(dut, True, repeat=10)
+ dut.log.info("RTT Supporting APs=%s", aps)
+
+ asserts.assert_true(
+ len(aps) > 0,
+ "Need at least one AP which supports 802.11mc!")
+ if len(aps) > max_peers:
+ aps = aps[0:max_peers]
+
+ id1 = dut.droid.wifiRttStartRangingToAccessPoints(aps) # as ACTS/sl4a
+ id2 = dut.droid.wifiRttStartRangingToAccessPoints(aps, fake_uid)
+ id3 = dut.droid.wifiRttStartRangingToAccessPoints(aps, fake_uid)
+ id4 = dut.droid.wifiRttStartRangingToAccessPoints(aps) # as ACTS/sl4a
+
+ rutils.wait_for_event(dut, rutils.decorate_event(
+ rconsts.EVENT_CB_RANGING_ON_RESULT, id1))
+ rutils.wait_for_event(dut, rutils.decorate_event(
+ rconsts.EVENT_CB_RANGING_ON_RESULT, id2))
+ rutils.wait_for_event(dut, rutils.decorate_event(
+ rconsts.EVENT_CB_RANGING_ON_FAIL, id3))
+ rutils.wait_for_event(dut, rutils.decorate_event(
+ rconsts.EVENT_CB_RANGING_ON_RESULT, id4))
diff --git a/acts/tests/google/wifi/rtt/stress/StressRangeApTest.py b/acts/tests/google/wifi/rtt/stress/StressRangeApTest.py
new file mode 100644
index 0000000..497c125
--- /dev/null
+++ b/acts/tests/google/wifi/rtt/stress/StressRangeApTest.py
@@ -0,0 +1,79 @@
+#!/usr/bin/python3.4
+#
+# Copyright 2017 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from acts import asserts
+from acts.base_test import BaseTestClass
+from acts.test_utils.wifi.rtt import rtt_test_utils as rutils
+from acts.test_utils.wifi.rtt.RttBaseTest import RttBaseTest
+
+
+class StressRangeApTest(RttBaseTest):
+ """Test class for stress testing of RTT ranging to Access Points"""
+
+ def __init__(self, controllers):
+ BaseTestClass.__init__(self, controllers)
+
+ #############################################################################
+
+ def test_rtt_supporting_ap_only(self):
+ """Scan for APs and perform RTT only to those which support 802.11mc.
+
+ Stress test: repeat ranging to the same AP. Verify rate of success and
+ stability of results.
+ """
+ dut = self.android_devices[0]
+ rtt_supporting_aps = rutils.scan_with_rtt_support_constraint(dut, True,
+ repeat=10)
+ dut.log.debug("RTT Supporting APs=%s", rtt_supporting_aps)
+
+ num_iter = self.stress_test_min_iteration_count
+
+ max_peers = dut.droid.wifiRttMaxPeersInRequest()
+ asserts.assert_true(
+ len(rtt_supporting_aps) > 0,
+ "Need at least one AP which supports 802.11mc!")
+ if len(rtt_supporting_aps) > max_peers:
+ rtt_supporting_aps = rtt_supporting_aps[0:max_peers]
+
+ events = rutils.run_ranging(dut, rtt_supporting_aps, num_iter, 0,
+ self.stress_test_target_run_time_sec)
+ stats = rutils.analyze_results(events, self.rtt_reference_distance_mm,
+ self.rtt_reference_distance_margin_mm,
+ self.rtt_min_expected_rssi_dbm,
+ self.lci_reference, self.lcr_reference,
+ summary_only=True)
+ dut.log.debug("Stats=%s", stats)
+
+ for bssid, stat in stats.items():
+ asserts.assert_true(stat['num_no_results'] == 0,
+ "Missing (timed-out) results", extras=stats)
+ asserts.assert_false(stat['any_lci_mismatch'],
+ "LCI mismatch", extras=stats)
+ asserts.assert_false(stat['any_lcr_mismatch'],
+ "LCR mismatch", extras=stats)
+ asserts.assert_equal(stat['num_invalid_rssi'], 0, "Invalid RSSI",
+ extras=stats)
+ asserts.assert_true(stat['num_failures'] <=
+ self.rtt_max_failure_rate_two_sided_rtt_percentage
+ * stat['num_results'] / 100,
+ "Failure rate is too high", extras=stats)
+ asserts.assert_true(stat['num_range_out_of_margin'] <=
+ self.rtt_max_margin_exceeded_rate_two_sided_rtt_percentage
+ * stat['num_success_results'] / 100,
+ "Results exceeding error margin rate is too high",
+ extras=stats)
+ asserts.explicit_pass("RTT test done", extras=stats)
+
diff --git a/acts/tests/google/wifi/rtt/stress/StressRangeAwareTest.py b/acts/tests/google/wifi/rtt/stress/StressRangeAwareTest.py
new file mode 100644
index 0000000..3073898
--- /dev/null
+++ b/acts/tests/google/wifi/rtt/stress/StressRangeAwareTest.py
@@ -0,0 +1,137 @@
+#!/usr/bin/python3.4
+#
+# Copyright 2018 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import queue
+import time
+
+from acts import asserts
+from acts.test_utils.wifi.aware import aware_const as aconsts
+from acts.test_utils.wifi.aware import aware_test_utils as autils
+from acts.test_utils.wifi.aware.AwareBaseTest import AwareBaseTest
+from acts.test_utils.wifi.rtt import rtt_const as rconsts
+from acts.test_utils.wifi.rtt import rtt_test_utils as rutils
+from acts.test_utils.wifi.rtt.RttBaseTest import RttBaseTest
+
+
+class StressRangeAwareTest(AwareBaseTest, RttBaseTest):
+ """Test class for stress testing of RTT ranging to Wi-Fi Aware peers."""
+ SERVICE_NAME = "GoogleTestServiceXY"
+
+ def __init__(self, controllers):
+ AwareBaseTest.__init__(self, controllers)
+ RttBaseTest.__init__(self, controllers)
+
+ def setup_test(self):
+ """Manual setup here due to multiple inheritance: explicitly execute the
+ setup method from both parents."""
+ AwareBaseTest.setup_test(self)
+ RttBaseTest.setup_test(self)
+
+ def teardown_test(self):
+ """Manual teardown here due to multiple inheritance: explicitly execute the
+ teardown method from both parents."""
+ AwareBaseTest.teardown_test(self)
+ RttBaseTest.teardown_test(self)
+
+ #############################################################################
+
+ def run_rtt_discovery(self, init_dut, resp_mac=None, resp_peer_id=None):
+ """Perform single RTT measurement, using Aware, from the Initiator DUT to
+ a Responder. The RTT Responder can be specified using its MAC address
+ (obtained using out- of-band discovery) or its Peer ID (using Aware
+ discovery).
+
+ Args:
+ init_dut: RTT Initiator device
+ resp_mac: MAC address of the RTT Responder device
+ resp_peer_id: Peer ID of the RTT Responder device
+ """
+ asserts.assert_true(resp_mac is not None or resp_peer_id is not None,
+ "One of the Responder specifications (MAC or Peer ID)"
+ " must be provided!")
+ if resp_mac is not None:
+ id = init_dut.droid.wifiRttStartRangingToAwarePeerMac(resp_mac)
+ else:
+ id = init_dut.droid.wifiRttStartRangingToAwarePeerId(resp_peer_id)
+ try:
+ event = init_dut.ed.pop_event(rutils.decorate_event(
+ rconsts.EVENT_CB_RANGING_ON_RESULT, id), rutils.EVENT_TIMEOUT)
+ result = event["data"][rconsts.EVENT_CB_RANGING_KEY_RESULTS][0]
+ if resp_mac is not None:
+ rutils.validate_aware_mac_result(result, resp_mac, "DUT")
+ else:
+ rutils.validate_aware_peer_id_result(result, resp_peer_id, "DUT")
+ return result
+ except queue.Empty:
+ return None
+
+ def test_stress_rtt_ib_discovery_set(self):
+ """Perform a set of RTT measurements, using in-band (Aware) discovery, and
+ switching Initiator and Responder roles repeatedly.
+
+ Stress test: repeat ranging operations. Verify rate of success and
+ stability of results.
+ """
+ p_dut = self.android_devices[0]
+ s_dut = self.android_devices[1]
+
+ (p_id, s_id, p_disc_id, s_disc_id,
+ peer_id_on_sub, peer_id_on_pub) = autils.create_discovery_pair(
+ p_dut,
+ s_dut,
+ p_config=autils.add_ranging_to_pub(autils.create_discovery_config(
+ self.SERVICE_NAME, aconsts.PUBLISH_TYPE_UNSOLICITED), True),
+ s_config=autils.add_ranging_to_pub(autils.create_discovery_config(
+ self.SERVICE_NAME, aconsts.SUBSCRIBE_TYPE_PASSIVE), True),
+ device_startup_offset=self.device_startup_offset,
+ msg_id=self.get_next_msg_id())
+
+ results = []
+ start_clock = time.time()
+ iterations_done = 0
+ run_time = 0
+ while iterations_done < self.stress_test_min_iteration_count or (
+ self.stress_test_target_run_time_sec != 0
+ and run_time < self.stress_test_target_run_time_sec):
+ results.append(self.run_rtt_discovery(p_dut, resp_peer_id=peer_id_on_pub))
+ results.append(self.run_rtt_discovery(s_dut, resp_peer_id=peer_id_on_sub))
+
+ iterations_done = iterations_done + 1
+ run_time = time.time() - start_clock
+
+ stats = rutils.extract_stats(results, self.rtt_reference_distance_mm,
+ self.rtt_reference_distance_margin_mm,
+ self.rtt_min_expected_rssi_dbm,
+ summary_only=True)
+ self.log.debug("Stats: %s", stats)
+ asserts.assert_true(stats['num_no_results'] == 0,
+ "Missing (timed-out) results", extras=stats)
+ asserts.assert_false(stats['any_lci_mismatch'],
+ "LCI mismatch", extras=stats)
+ asserts.assert_false(stats['any_lcr_mismatch'],
+ "LCR mismatch", extras=stats)
+ asserts.assert_equal(stats['num_invalid_rssi'], 0, "Invalid RSSI",
+ extras=stats)
+ asserts.assert_true(
+ stats['num_failures'] <=
+ self.rtt_max_failure_rate_two_sided_rtt_percentage
+ * stats['num_results'] / 100,
+ "Failure rate is too high", extras=stats)
+ asserts.assert_true(
+ stats['num_range_out_of_margin']
+ <= self.rtt_max_margin_exceeded_rate_two_sided_rtt_percentage
+ * stats['num_success_results'] / 100,
+ "Results exceeding error margin rate is too high", extras=stats)